Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]

2024-05-27 Thread via GitHub


kamalcph closed pull request #15634: KAFKA-16452: Bound high-watermark offset 
to range between LLSO and LEO 
URL: https://github.com/apache/kafka/pull/15634


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]

2024-05-27 Thread via GitHub


showuon commented on PR #15634:
URL: https://github.com/apache/kafka/pull/15634#issuecomment-2133081717

   @kamalcph , could we close this PR since 
https://github.com/apache/kafka/pull/15825 is merged?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]

2024-05-01 Thread via GitHub


junrao commented on code in PR #15634:
URL: https://github.com/apache/kafka/pull/15634#discussion_r1586840549


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
* Update high watermark with offset metadata. The new high watermark will 
be lower
-   * bounded by the log start offset and upper bounded by the log end offset.
+   * bounded by the local-log-start-offset and upper bounded by the 
log-end-offset.
*
* @param highWatermarkMetadata the suggested high watermark with offset 
metadata
* @return the updated high watermark offset
*/
   def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = {
 val endOffsetMetadata = localLog.logEndOffsetMetadata
-val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
logStartOffset) {
-  new LogOffsetMetadata(logStartOffset)
+val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
_localLogStartOffset) {

Review Comment:
   > I'm not clear on this:
   > 
   > 1. Segments that are eligible for upload to remote storage only when the 
lastStableOffset moves beyond the segment-to-be-uploaded-end-offset.
   > 2. When all the replicas loses local data (offline partition), then we 
consider the data in remote storage also lost. Currently, for this case, we 
don't have provision to serve the remote data.
   > 3. When firstUnstableOffsetMetadata is empty, we return highWatermark. 
With this patch, the highWatermark lower boundary is set to localLogStartOffset 
so there won't be an issue.
   > 
   
   That's true. It's just that that is yet another offset that we need to 
bound. I am also not sure if there are other side effects of adjusting HWM and 
LSO.
   
   Left some comments on https://github.com/apache/kafka/pull/15825.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]

2024-04-28 Thread via GitHub


kamalcph commented on code in PR #15634:
URL: https://github.com/apache/kafka/pull/15634#discussion_r1582149811


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
* Update high watermark with offset metadata. The new high watermark will 
be lower
-   * bounded by the log start offset and upper bounded by the log end offset.
+   * bounded by the local-log-start-offset and upper bounded by the 
log-end-offset.
*
* @param highWatermarkMetadata the suggested high watermark with offset 
metadata
* @return the updated high watermark offset
*/
   def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = {
 val endOffsetMetadata = localLog.logEndOffsetMetadata
-val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
logStartOffset) {
-  new LogOffsetMetadata(logStartOffset)
+val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
_localLogStartOffset) {

Review Comment:
   Opened #15825 a draft PR with the suggested approach. PTAL.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]

2024-04-28 Thread via GitHub


kamalcph commented on code in PR #15634:
URL: https://github.com/apache/kafka/pull/15634#discussion_r1582144306


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
* Update high watermark with offset metadata. The new high watermark will 
be lower
-   * bounded by the log start offset and upper bounded by the log end offset.
+   * bounded by the local-log-start-offset and upper bounded by the 
log-end-offset.
*
* @param highWatermarkMetadata the suggested high watermark with offset 
metadata
* @return the updated high watermark offset
*/
   def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = {
 val endOffsetMetadata = localLog.logEndOffsetMetadata
-val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
logStartOffset) {
-  new LogOffsetMetadata(logStartOffset)
+val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
_localLogStartOffset) {

Review Comment:
   > For example, if we lose the local data in all replicas, the 
lastStableOffset could still be in the middle of a tiered segment and moving it 
to localLogStartOffset immediately will be incorrect.
   
   I'm not clear on this:
   
   1. Segments that are eligible for upload to remote storage only when the 
`lastStableOffset` moves beyond the segment-to-be-uploaded-end-offset. 
   2. When all the replicas loses local data (offline partition), then we 
consider the data in remote storage also lost. Currently, for this case, we 
don't have provision to serve the remote data.
   3. When `firstUnstableOffsetMetadata` is empty, we return `highWatermark`. 
With this patch, the `highWatermark` lower boundary is set to 
`localLogStartOffset` so there won't be an issue. 
   
   > Note that OffsetMetadata (segmentBaseOffset and relativePositionInSegment) 
is only used in DelayedFetch for estimating the amount of available bytes.
   
   The 
[LogOffsetMetadata#onOlderSegment](https://sourcegraph.com/github.com/apache/kafka@5de5d967adffd864bad3ec729760a430253abf38/-/blob/storage/src/main/java/org/apache/kafka/storage/internals/log/LogOffsetMetadata.java?L54)
 method is used in the 
[hot-path](https://sourcegraph.com/github.com/apache/kafka@5de5d967adffd864bad3ec729760a430253abf38/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L324)
 of incrementing the high-watermark and expects the full metadata, otherwise it 
throws an error. Is it ok to remove the throwable from 
LogOffsetMetadata#onOlderSegment method and return `false` when 
`messageOffsetOnly` available?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]

2024-04-28 Thread via GitHub


kamalcph commented on code in PR #15634:
URL: https://github.com/apache/kafka/pull/15634#discussion_r1582144306


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
* Update high watermark with offset metadata. The new high watermark will 
be lower
-   * bounded by the log start offset and upper bounded by the log end offset.
+   * bounded by the local-log-start-offset and upper bounded by the 
log-end-offset.
*
* @param highWatermarkMetadata the suggested high watermark with offset 
metadata
* @return the updated high watermark offset
*/
   def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = {
 val endOffsetMetadata = localLog.logEndOffsetMetadata
-val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
logStartOffset) {
-  new LogOffsetMetadata(logStartOffset)
+val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
_localLogStartOffset) {

Review Comment:
   > For example, if we lose the local data in all replicas, the 
lastStableOffset could still be in the middle of a tiered segment and moving it 
to localLogStartOffset immediately will be incorrect.
   
   I'm not clear on this:
   
   1. Segments that are eligible for upload to remote storage only when the 
`lastStableOffset` moves beyond the segment-to-be-uploaded-end-offset. 
   2. When all the replicas loses local data (offline partition), then we 
consider the data in remote storage also lost. Currently, for this case, we 
don't have provision to serve the remote data.
   3. When `firstUnstableOffsetMetadata` is empty, we return `highWatermark`. 
With this patch, the `highWatermark` lower boundary is set to 
`localLogStartOffset` so there won't be an issue. 
   
   > Note that OffsetMetadata (segmentBaseOffset and relativePositionInSegment) 
is only used in DelayedFetch for estimating the amount of available bytes.
   
   The 
[LogOffsetMetadata#onOlderSegment](https://sourcegraph.com/github.com/apache/kafka@5de5d967adffd864bad3ec729760a430253abf38/-/blob/storage/src/main/java/org/apache/kafka/storage/internals/log/LogOffsetMetadata.java?L54)
 method is used in the 
[hot-path](https://sourcegraph.com/github.com/apache/kafka@5de5d967adffd864bad3ec729760a430253abf38/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L324)
 of incrementing the high-watermark and expects the full metadata, otherwise it 
throws an error. Is it ok to remove the throwable from 
LogOffsetMetadata#onOlderSegment method and return `false` by default?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]

2024-04-28 Thread via GitHub


kamalcph commented on code in PR #15634:
URL: https://github.com/apache/kafka/pull/15634#discussion_r1582144306


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
* Update high watermark with offset metadata. The new high watermark will 
be lower
-   * bounded by the log start offset and upper bounded by the log end offset.
+   * bounded by the local-log-start-offset and upper bounded by the 
log-end-offset.
*
* @param highWatermarkMetadata the suggested high watermark with offset 
metadata
* @return the updated high watermark offset
*/
   def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = {
 val endOffsetMetadata = localLog.logEndOffsetMetadata
-val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
logStartOffset) {
-  new LogOffsetMetadata(logStartOffset)
+val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
_localLogStartOffset) {

Review Comment:
   > For example, if we lose the local data in all replicas, the 
lastStableOffset could still be in the middle of a tiered segment and moving it 
to localLogStartOffset immediately will be incorrect.
   
   I'm not clear on this:
   
   1. Segments that are eligible for upload to remote storage only when the 
`lastStableOffset` moves beyond the segment-to-be-uploaded-end-offset. 
   2. When all the replicas loses local data (offline partition), then we 
consider the data in remote storage also lost. Currently, for this case, we 
don't have provision to serve the remote data.
   3. When `firstUnstableOffsetMetadata` is empty, we return `highWatermark`. 
With this patch, the `highWatermark` lower boundary is set to 
`localLogStartOffset` so there won't be an issue. 
   
   > Note that OffsetMetadata (segmentBaseOffset and relativePositionInSegment) 
is only used in DelayedFetch for estimating the amount of available bytes.
   
   The 
[LogOffsetMetadata#onOlderSegment](https://sourcegraph.com/github.com/apache/kafka@5de5d967adffd864bad3ec729760a430253abf38/-/blob/storage/src/main/java/org/apache/kafka/storage/internals/log/LogOffsetMetadata.java?L54)
 method is used in the 
[hot-path](https://sourcegraph.com/github.com/apache/kafka@5de5d967adffd864bad3ec729760a430253abf38/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L324)
 of incrementing the high-watermark and expects the full metadata, otherwise it 
throws an error. Is it ok to remove the throwable from 
LogOffsetMetadata#onOlderSegment method and return `false` by default.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]

2024-04-18 Thread via GitHub


kamalcph commented on code in PR #15634:
URL: https://github.com/apache/kafka/pull/15634#discussion_r1571126880


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
* Update high watermark with offset metadata. The new high watermark will 
be lower
-   * bounded by the log start offset and upper bounded by the log end offset.
+   * bounded by the local-log-start-offset and upper bounded by the 
log-end-offset.
*
* @param highWatermarkMetadata the suggested high watermark with offset 
metadata
* @return the updated high watermark offset
*/
   def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = {
 val endOffsetMetadata = localLog.logEndOffsetMetadata
-val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
logStartOffset) {
-  new LogOffsetMetadata(logStartOffset)
+val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
_localLogStartOffset) {

Review Comment:
   Thanks for suggesting the alternative approach. I'll check and comeback on 
this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]

2024-04-18 Thread via GitHub


kamalcph commented on PR #15634:
URL: https://github.com/apache/kafka/pull/15634#issuecomment-2064647379

   Thanks @chia7712 for the review!
   
   > log-start-offset-checkpoint is missing and remote storage is enabled. The 
logStartOffset will be set to zero, and it seems be a potential issue since the 
ListOffsetRequest could get incorrect result
   
   Most of the time when the follower joins the ISR, it updates the 
log-start-offset and high-watermark from the leader FETCH response. The issue 
can happen only when the follower gets elected as leader before updating it's 
state as mentioned in the summary/comments. 
   
   When the `log-start-offset-checkpoint` file is missing:
   1.  For normal topic, the log-start-offset will be set to base-offset of the 
first log segment so there is no issue. Since the data is there, read won't 
fail.
   2. For remote topic, the log-start-offset will be stale for sometime until 
the RemoteLogManager 
[updates](https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L671)
 it, so the issue is intermittent and self-recovers.
   
   > replication-offset-checkpoint is missing and remote storage is enabled. 
This is what your described. The HWM is pointed to middle of tiered storage and 
so it causes error when fetching records from local segments.
   
   This is not an issue for normal topic. But for cluster enabled with 
remote-storage, if the issue happens even on 1 partition, then it starts to 
affect *subset* of topics.  Controller batches the partitions in the 
LeaderAndIsr request. If the broker fails to process the LISR for one 
partition, then the remaining partition in that batch won't be processed. The 
producers producing to those topics will start receiving 
NOT_LEADER_FOR_PARTITION error.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]

2024-04-18 Thread via GitHub


chia7712 commented on PR #15634:
URL: https://github.com/apache/kafka/pull/15634#issuecomment-2063727744

   Sorry that the story I mentioned above seems be another issue. Let me have 
the summary about my thought.
   
   1. `log-start-offset-checkpoint` is missing and remote storage is enabled. 
The `logStartOffset` will be set to zero, and it seems be a potential issue 
since the `ListOffsetRequest` could get incorrect result
   2. `replication-offset-checkpoint` is missing and remote storage is enabled. 
This is what your described. The HWM is pointed to middle of tiered storage and 
so it causes error when fetching records from local segments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]

2024-04-18 Thread via GitHub


chia7712 commented on PR #15634:
URL: https://github.com/apache/kafka/pull/15634#issuecomment-2063435567

   > HWM is set to to localLogStartOffset in 
[UnifiedLog#updateLocalLogStartOffset](https://sourcegraph.com/github.com/apache/kafka@f895ab5145077c5efa10a4a898628d901b01e2c2/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L162),
 then we load the HWM from the checkpoint file in 
[Partition#createLog](https://sourcegraph.com/github.com/apache/kafka@f895ab5145077c5efa10a4a898628d901b01e2c2/-/blob/core/src/main/scala/kafka/cluster/Partition.scala?L495).
   If the HWM checkpoint file is missing / does not contain the entry for 
partition, then the default value of 0 is taken. If 0 < LogStartOffset (LSO), 
then LSO is assumed as HWM . Thus, the non-monotonic update of highwatermark 
from LLSO to LSO can happen.
   
   Pardon me. I'm a bit confused about this. Please feel free to correct me to 
help me catch up :smile: 
   
   ### case 0: the checkpoint file is missing and the remote storage is 
**disabled**
   The LSO is initialized to LLSO
   
   
https://github.com/apache/kafka/blob/aee9724ee15ed539ae73c09cc2c2eda83ae3c864/core/src/main/scala/kafka/log/LogLoader.scala#L180
   
   so I can't understand why the non-monotonic update happens? After all, LLSO 
and LSO are the same in this scenario.
   
   ### case 1: the checkpoint file is missing and the remote storage is 
**enabled**
   The LSO is initialzied to `logStartOffsetCheckpoint` which is 0 since there 
are no checkpoint files.
   
   
https://github.com/apache/kafka/blob/aee9724ee15ed539ae73c09cc2c2eda83ae3c864/core/src/main/scala/kafka/log/LogLoader.scala#L178
   
   And then HWM will be update to LLSO which is larger than zero.
   
   
https://github.com/apache/kafka/blob/aee9724ee15ed539ae73c09cc2c2eda83ae3c864/core/src/main/scala/kafka/log/UnifiedLog.scala#L172
   
   And this could be a problem when 
[Partition#createLog](https://sourcegraph.com/github.com/apache/kafka@f895ab5145077c5efa10a4a898628d901b01e2c2/-/blob/core/src/main/scala/kafka/cluster/Partition.scala?L495)
 get called since the HWM is changed from LLSO (non-zero) to LSO (zero). Also, 
the incorrect HWM causes error in `convertToOffsetMetadataOrThrow`.
   
   If I understand correctly, it seems the root cause is that "when the 
checkpoint files are not working, we will initialize a `UnifiedLog` with 
incorrect LSO". 
   
   and so could we fix that by re-build `logStartOffsets` according remote 
storage when checkpoint is not working 
(https://github.com/apache/kafka/blob/aee9724ee15ed539ae73c09cc2c2eda83ae3c864/core/src/main/scala/kafka/log/LogManager.scala#L459)?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]

2024-04-17 Thread via GitHub


junrao commented on code in PR #15634:
URL: https://github.com/apache/kafka/pull/15634#discussion_r1569562746


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
* Update high watermark with offset metadata. The new high watermark will 
be lower
-   * bounded by the log start offset and upper bounded by the log end offset.
+   * bounded by the local-log-start-offset and upper bounded by the 
log-end-offset.
*
* @param highWatermarkMetadata the suggested high watermark with offset 
metadata
* @return the updated high watermark offset
*/
   def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = {
 val endOffsetMetadata = localLog.logEndOffsetMetadata
-val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
logStartOffset) {
-  new LogOffsetMetadata(logStartOffset)
+val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
_localLogStartOffset) {

Review Comment:
   @kamalcph : Thanks for the explanation. I understand the problem now.
   
   As for the fix, it seems that it could work for HWM. However, I am not sure 
that we could always do the same thing of LastStableOffset. For example, if we 
lose the local data in all replicas, the lastStableOffset could still be in the 
middle of a tiered segment and moving it to localLogStartOffset immediately 
will be incorrect. 
   
   Here is another potential approach. Note that OffsetMetadata 
(segmentBaseOffset and relativePositionInSegment) is only used in DelayedFetch 
for estimating the amount of available bytes. If occasionally OffsetMetadata is 
not available, we don't have to force an exception in 
convertToOffsetMetadataOrThrow(). Instead, we can leave the OffsetMetadata as 
empty and just use a conservative 1 byte for estimating the amount of available 
bytes. This approach will apply to both HWM and LSO.  The inaccurate byte 
estimate will be ok as long as it's infrequent. What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]

2024-04-17 Thread via GitHub


kamalcph commented on code in PR #15634:
URL: https://github.com/apache/kafka/pull/15634#discussion_r1569356239


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1223,6 +1223,12 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 s"but we only have log segments starting from offset: 
$logStartOffset.")
   }
 
+  private def checkLocalLogStartOffset(offset: Long): Unit = {

Review Comment:
   Agree on this. The `checkLocalLogStartOffset` is used only in the 
`convertToOffsetMetadataOrThrow` method which reads from local-disk.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]

2024-04-17 Thread via GitHub


kamalcph commented on code in PR #15634:
URL: https://github.com/apache/kafka/pull/15634#discussion_r1563885907


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
* Update high watermark with offset metadata. The new high watermark will 
be lower
-   * bounded by the log start offset and upper bounded by the log end offset.
+   * bounded by the local-log-start-offset and upper bounded by the 
log-end-offset.
*
* @param highWatermarkMetadata the suggested high watermark with offset 
metadata
* @return the updated high watermark offset
*/
   def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = {
 val endOffsetMetadata = localLog.logEndOffsetMetadata
-val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
logStartOffset) {
-  new LogOffsetMetadata(logStartOffset)
+val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
_localLogStartOffset) {

Review Comment:
   > In the rare case, the restarted broker is elected as the leader before 
caught up through unclean election. Is this the case that you want to address?
   
   yes, we want to address this case too. And, the issue can also happen during 
clean preferred-leader-election:
   
   ```
   Call stack: The replica (1002) has full data but HW is invalid, then the 
fetch-offset will be equal to LeaderLog(1001).highWatermark
   
   Leader (1001):
   KafkaApis.handleFetchRequest
   ReplicaManager.fetchMessages
   ReplicaManager.readFromLocalLog
   Partition.fetchRecords
   Partition.updateFollowerFetchState
   Partition.maybeExpandIsr
   Partition.submitAlterPartition
   ...
   ...
   ...
   # If there is not enough data to respond and there is no remote 
data, we will let the fetch request wait for new data.
   # parks the request in the DelayedFetchPurgatory
   
   
   Another thread, runs Preferred-Leader-Election in controller (1003), since 
the replica 1002 joined the ISR list, it can be elected as the preferred 
leader. The controller sends LeaderAndIsr requests to all the brokers.
   
   KafkaController.processReplicaLeaderElection
   KafkaController.onReplicaElection
   PartitionStateMachine.handleStateChanges
   PartitionStateMachine.doHandleStateChanges
   PartitionStateMachine.electLeaderForPartitions
   ControllerChannelManager.sendRequestsToBrokers
   
   
   Replica 1002 got elected as Leader and have invalid highWatermark since it 
didn't process the fetch-response from the previous leader 1001, throws 
OFFSET_OUT_OF_RANGE error when processing the LeaderAndIsr request. Note that 
in LeaderAndIsr request even if one partition fails, then the remaining 
partitions in that request won't be processed.
   
   KafkaApis.handleLeaderAndIsrRequest
   ReplicaManager.becomeLeaderOrFollower
   ReplicaManager.makeLeaders
   Partition.makeLeader
   Partition.maybeIncrementLeaderHW
   UnifiedLog.maybeIncrementHighWatermark (LeaderLog)
   UnifiedLog.fetchHighWatermarkMetadata
   
   
   The controller assumes that the current-leader for the tp0 is 1002, but the 
broker 1002 couldn't process the LISR. The controller retries the LISR until 
the broker 1002 becomes leader for tp0. During this time, the producers won't 
be able to send messages, as the node 1002, sends NOT_LEADER_FOR_PARTITION 
error-code to the producer.
   
   During this time, if a follower sends the FETCH request to read from the 
current-leader 1002, then OFFSET_OUT_OF_RANGE error will be returned by the 
leader:
   
KafkaApis.handleFetchRequest
 ReplicaManager.fetchMessages
 ReplicaManager.readFromLog
 Partition.fetchRecords
 # readFromLocalLog
 Partition.updateFollowerFetchState
 Partition.maybeIncrementLeaderHW
  LeaderLog.maybeIncrementHighWatermark
 UnifiedLog.fetchHighWatermarkMetadata
 UnifiedLog.convertToOffsetMetadataOrThrow
 LocalLog.convertToOffsetMetadataOrThrow
 LocalLog.read
 # OffsetOutOfRangeException 
exception
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about 

Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]

2024-04-17 Thread via GitHub


kamalcph commented on code in PR #15634:
URL: https://github.com/apache/kafka/pull/15634#discussion_r1569328393


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
* Update high watermark with offset metadata. The new high watermark will 
be lower
-   * bounded by the log start offset and upper bounded by the log end offset.
+   * bounded by the local-log-start-offset and upper bounded by the 
log-end-offset.
*
* @param highWatermarkMetadata the suggested high watermark with offset 
metadata
* @return the updated high watermark offset
*/
   def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = {
 val endOffsetMetadata = localLog.logEndOffsetMetadata
-val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
logStartOffset) {
-  new LogOffsetMetadata(logStartOffset)
+val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
_localLogStartOffset) {

Review Comment:
   > For the makeLeaders path, it will call 
UnifiedLog.convertToOffsetMetadataOrThrow. Within it, 
checkLogStartOffset(offset) shouldn't throw OFFSET_OUT_OF_RANGE since we are 
comparing the offset with logStartOffset. Do you know which part throws 
OFFSET_OUT_OF_RANGE error?
   
   The next line `localLog.convertToOffsetMetadataOrThrow` in 
[convertToOffsetMetadataOrThrow](https://sourcegraph.com/github.com/apache/kafka@a3dcbd4e28a35f79f75ec1bf316ef0b39c0df164/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L1429)
 method reads the segment from disk, there it throws the error. The call 
[segments.floorSegment(startOffset)](https://sourcegraph.com/github.com/apache/kafka@a3dcbd4e28a35f79f75ec1bf316ef0b39c0df164/-/blob/core/src/main/scala/kafka/log/LocalLog.scala?L366)
 in LocalLog fails to find the segment with log-start-offset, then 
OffsetOutOfRangeException is thrown.
   
   > For the follower fetch path, it's bounded by LogEndOffset. So it shouldn't 
need to call UnifiedLog.fetchHighWatermarkMetadata, right? The regular consumer 
will call UnifiedLog.fetchHighWatermarkMetadata.
   
   yes, you're right. I attached the wrong call stack for handling the follower 
request. Please find the updated call stack below:
   
   Leader with invalid high-watermark handles the FETCH requests from follower 
and throws OFFSET_OUT_OF_RANGE error:
   
   ```
KafkaApis.handleFetchRequest
   ReplicaManager.fetchMessages
   ReplicaManager.readFromLog
   Partition.fetchRecords
   # readFromLocalLog
   Partition.updateFollowerFetchState
   Partition.maybeIncrementLeaderHW
LeaderLog.maybeIncrementHighWatermark
   UnifiedLog.fetchHighWatermarkMetadata
   UnifiedLog.convertToOffsetMetadataOrThrow
   
LocalLog.convertToOffsetMetadataOrThrow
   LocalLog.read
   # OffsetOutOfRangeException 
exception
   
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]

2024-04-17 Thread via GitHub


kamalcph commented on code in PR #15634:
URL: https://github.com/apache/kafka/pull/15634#discussion_r1563885907


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
* Update high watermark with offset metadata. The new high watermark will 
be lower
-   * bounded by the log start offset and upper bounded by the log end offset.
+   * bounded by the local-log-start-offset and upper bounded by the 
log-end-offset.
*
* @param highWatermarkMetadata the suggested high watermark with offset 
metadata
* @return the updated high watermark offset
*/
   def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = {
 val endOffsetMetadata = localLog.logEndOffsetMetadata
-val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
logStartOffset) {
-  new LogOffsetMetadata(logStartOffset)
+val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
_localLogStartOffset) {

Review Comment:
   > In the rare case, the restarted broker is elected as the leader before 
caught up through unclean election. Is this the case that you want to address?
   
   yes, we want to address this case too. And, the issue can also happen during 
clean preferred-leader-election:
   
   ```
   Call stack: The replica (1002) has full data but HW is invalid, then the 
fetch-offset will be equal to LeaderLog(1001).highWatermark
   
   Leader (1001):
   KafkaApis.handleFetchRequest
   ReplicaManager.fetchMessages
   ReplicaManager.readFromLocalLog
   Partition.fetchRecords
   Partition.updateFollowerFetchState
   Partition.maybeExpandIsr
   Partition.submitAlterPartition
   ...
   ...
   ...
   # If there is not enough data to respond and there is no remote 
data, we will let the fetch request wait for new data.
   # parks the request in the DelayedFetchPurgatory
   
   
   Another thread, runs Preferred-Leader-Election in controller (1003), since 
the replica 1002 joined the ISR list, it can be elected as the preferred 
leader. The controller sends LeaderAndIsr requests to all the brokers.
   
   KafkaController.processReplicaLeaderElection
   KafkaController.onReplicaElection
   PartitionStateMachine.handleStateChanges
   PartitionStateMachine.doHandleStateChanges
   PartitionStateMachine.electLeaderForPartitions
   ControllerChannelManager.sendRequestsToBrokers
   
   
   Replica 1002 got elected as Leader and have invalid highWatermark since it 
didn't process the fetch-response from the previous leader 1001, throws 
OFFSET_OUT_OF_RANGE error when processing the LeaderAndIsr request. Note that 
in LeaderAndIsr request even if one partition fails, then the remaining 
partitions in that request won't be processed.
   
   KafkaApis.handleLeaderAndIsrRequest
   ReplicaManager.becomeLeaderOrFollower
   ReplicaManager.makeLeaders
   Partition.makeLeader
   Partition.maybeIncrementLeaderHW
   UnifiedLog.maybeIncrementHighWatermark (LeaderLog)
   UnifiedLog.fetchHighWatermarkMetadata
   
   
   The controller assumes that the current-leader for the tp0 is 1002, but the 
broker 1002 couldn't process the LISR. The controller retries the LISR until 
the broker 1002 becomes leader for tp0. During this time, the producers won't 
be able to send messages, as the node 1002, sends NOT_LEADER_FOR_PARTITION 
error-code to the producer.
   
   During this time, if a follower sends the FETCH request to read from the 
current-leader 1002, then OFFSET_OUT_OF_RANGE error will be returned by the 
leader:
   
   KafkaApis.handleFetchRequest
   ReplicaManager.fetchMessages
   ReplicaManager.readFromLog
   Partition.fetchRecords
   Partition.readRecords
   UnifiedLog.read 
   UnifiedLog.fetchHighWatermarkMetadata
   UnifiedLog.convertToOffsetMetadataOrThrow
   LocalLog.convertToOffsetMetadataOrThrow
   LocalLog.read
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]

2024-04-16 Thread via GitHub


junrao commented on code in PR #15634:
URL: https://github.com/apache/kafka/pull/15634#discussion_r1567796843


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
* Update high watermark with offset metadata. The new high watermark will 
be lower
-   * bounded by the log start offset and upper bounded by the log end offset.
+   * bounded by the local-log-start-offset and upper bounded by the 
log-end-offset.
*
* @param highWatermarkMetadata the suggested high watermark with offset 
metadata
* @return the updated high watermark offset
*/
   def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = {
 val endOffsetMetadata = localLog.logEndOffsetMetadata
-val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
logStartOffset) {
-  new LogOffsetMetadata(logStartOffset)
+val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
_localLogStartOffset) {

Review Comment:
   Thanks for the detailed explanation.
   
   For the `makeLeaders` path, it will call 
`UnifiedLog.convertToOffsetMetadataOrThrow`. Within it, 
`checkLogStartOffset(offset)` shouldn't throw OFFSET_OUT_OF_RANGE since we are 
comparing the offset with logStartOffset. Do you know which part throws 
OFFSET_OUT_OF_RANGE error?
   
   For the follower fetch path, it's bounded by `LogEndOffset`. So it shouldn't 
need to call `UnifiedLog.fetchHighWatermarkMetadata`, right? The regular 
consumer will call `UnifiedLog.fetchHighWatermarkMetadata`.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]

2024-04-16 Thread via GitHub


chia7712 commented on code in PR #15634:
URL: https://github.com/apache/kafka/pull/15634#discussion_r1567414802


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1223,6 +1223,12 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 s"but we only have log segments starting from offset: 
$logStartOffset.")
   }
 
+  private def checkLocalLogStartOffset(offset: Long): Unit = {

Review Comment:
   It seems reading records between [logStartOffset, localLogStartOffset] is 
dangerous since the segment won't be in local-disk. That is a bit chaos to me 
as `UnifiedLog` presents a unified view of local and tiered log segment 
(https://github.com/apache/kafka/blob/fccd7fec666d6570758e0b7891771099240ceee8/core/src/main/scala/kafka/log/UnifiedLog.scala#L59).
 The check looks like a limit that we can't "view" data from tiered log segment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]

2024-04-13 Thread via GitHub


kamalcph commented on code in PR #15634:
URL: https://github.com/apache/kafka/pull/15634#discussion_r1563885907


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
* Update high watermark with offset metadata. The new high watermark will 
be lower
-   * bounded by the log start offset and upper bounded by the log end offset.
+   * bounded by the local-log-start-offset and upper bounded by the 
log-end-offset.
*
* @param highWatermarkMetadata the suggested high watermark with offset 
metadata
* @return the updated high watermark offset
*/
   def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = {
 val endOffsetMetadata = localLog.logEndOffsetMetadata
-val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
logStartOffset) {
-  new LogOffsetMetadata(logStartOffset)
+val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
_localLogStartOffset) {

Review Comment:
   > In the rare case, the restarted broker is elected as the leader before 
caught up through unclean election. Is this the case that you want to address?
   
   yes, we want to address this case too. And, the issue can also happen during 
clean preferred-leader-election:
   
   ```
   Call stack: The replica (1002) has full data but HW is invalid, then the 
fetch-offset will be equal to LeaderLog(1001).highWatermark
   
   Leader (1001):
   KafkaApis.handleFetchRequest
   ReplicaManager.fetchMessages
   ReplicaManager.readFromLocalLog
   Partition.fetchRecords
   Partition.updateFollowerFetchState
   Partition.maybeExpandIsr
   Partition.submitAlterPartition
   ...
   ...
   ...
   # If there is not enough data to respond and there is no remote 
data, we will let the fetch request wait for new data.
   # parks the request in the DelayedFetchPurgatory
   
   
   Another thread, runs Preferred-Leader-Election in controller (1003), since 
the replica 1002 joined the ISR list, it can be elected as the preferred 
leader. The controller sends LeaderAndIsr requests to all the brokers.
   
   KafkaController.processReplicaLeaderElection
   KafkaController.onReplicaElection
   PartitionStateMachine.handleStateChanges
   PartitionStateMachine.doHandleStateChanges
   PartitionStateMachine.electLeaderForPartitions
   ControllerChannelManager.sendRequestsToBrokers
   
   
   Replica 1002 got elected as Leader and have invalid highWatermark since it 
didn't process the fetch-response from the previous leader 1001, throws 
OFFSET_OUT_OF_RANGE error when processing the LeaderAndIsr request. Note that 
in LeaderAndIsr request even if one partition fails, then the remaining 
partitions in that request won't be processed.
   
   KafkaApis.handleLeaderAndIsrRequest
   ReplicaManager.becomeLeaderOrFollower
   ReplicaManager.makeLeaders
   Partition.makeLeader
   Partition.maybeIncrementLeaderHW
   UnifiedLog.maybeIncrementHighWatermark (LeaderLog)
   UnifiedLog.fetchHighWatermarkMetadata
   
   
   The controller assumes that the current-leader for the tp0 is 1002, but the 
broker 1002 couldn't process the LISR. The controller retries the LISR until 
the broker 1002 becomes leader for tp0. During this time, the producers won't 
be able to send messages, as the node 1002, sends NOT_LEADER_FOR_PARTITION 
error-code to the producer.
   
   During this time, if a follower sends the FETCH request to read from the 
current-leader 1002, then OFFSET_OUT_OF_RANGE error will be returned:
   
   KafkaApis.handleFetchRequest
   ReplicaManager.fetchMessages
   ReplicaManager.readFromLog
   Partition.fetchRecords
   Partition.readRecords
   UnifiedLog.read 
   UnifiedLog.fetchHighWatermarkMetadata
   UnifiedLog.convertToOffsetMetadataOrThrow
   LocalLog.convertToOffsetMetadataOrThrow
   LocalLog.read
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]

2024-04-13 Thread via GitHub


kamalcph commented on code in PR #15634:
URL: https://github.com/apache/kafka/pull/15634#discussion_r1563885907


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
* Update high watermark with offset metadata. The new high watermark will 
be lower
-   * bounded by the log start offset and upper bounded by the log end offset.
+   * bounded by the local-log-start-offset and upper bounded by the 
log-end-offset.
*
* @param highWatermarkMetadata the suggested high watermark with offset 
metadata
* @return the updated high watermark offset
*/
   def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = {
 val endOffsetMetadata = localLog.logEndOffsetMetadata
-val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
logStartOffset) {
-  new LogOffsetMetadata(logStartOffset)
+val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
_localLogStartOffset) {

Review Comment:
   > In the rare case, the restarted broker is elected as the leader before 
caught up through unclean election. Is this the case that you want to address?
   
   yes, we want to address this case too. And, the issue can also happen during 
clean preferred-leader-election:
   
   ```
   Call stack: The replica (1002) has full data but HW is invalid, then the 
fetch-offset will be equal to LeaderLog(1001).highWatermark
   
   Leader (1001):
   KafkaApis.handleFetchRequest
   ReplicaManager.fetchMessages
   ReplicaManager.readFromLocalLog
   Partition.fetchRecords
   Partition.updateFollowerFetchState
   Partition.maybeExpandIsr
   Partition.submitAlterPartition
   ...
   ...
   ...
   # If there is not enough data to respond and there is no remote 
data, we will let the fetch request wait for new data.
   # parks the request in the DelayedFetchPurgatory
   
   
   Another thread, runs Preferred-Leader-Election in controller (1003), since 
the replica 1002 joined the ISR list, it can be elected as the preferred 
leader. The controller sends LeaderAndIsr requests to all the brokers.
   
   KafkaController.processReplicaLeaderElection
   KafkaController.onReplicaElection
   PartitionStateMachine.handleStateChanges
   PartitionStateMachine.doHandleStateChanges
   PartitionStateMachine.electLeaderForPartitions
   ControllerChannelManager.sendRequestsToBrokers
   
   
   Replica 1002 got elected as Leader and have invalid highWatermark since it 
didn't process the fetch-response from the previous leader 1001, throws 
OFFSET_OUT_OF_RANGE error when processing the LeaderAndIsr request. Note that 
in LeaderAndIsr request even if one partition fails, then the remaining 
partitions in that request won't be processed.
   
   KafkaApis.handleLeaderAndIsrRequest
   ReplicaManager.becomeLeaderOrFollower
   ReplicaManager.makeLeaders
   Partition.makeLeader
   Partition.maybeIncrementLeaderHW
   UnifiedLog.maybeIncrementHighWatermark (LeaderLog)
   UnifiedLog.fetchHighWatermarkMetadata
   
   
   The controller assumes that the current-leader for the tp0 is 1002, but the 
broker 1002 couldn't process the LISR. The controller retries the LISR until 
the broker 1002 becomes leader for tp0. During this time, the producers won't 
be able to send messages, as the node 1002, sends NOT_LEADER_FOR_PARTITION 
error-code to the producer.
   
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]

2024-04-13 Thread via GitHub


kamalcph commented on code in PR #15634:
URL: https://github.com/apache/kafka/pull/15634#discussion_r1563885907


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
* Update high watermark with offset metadata. The new high watermark will 
be lower
-   * bounded by the log start offset and upper bounded by the log end offset.
+   * bounded by the local-log-start-offset and upper bounded by the 
log-end-offset.
*
* @param highWatermarkMetadata the suggested high watermark with offset 
metadata
* @return the updated high watermark offset
*/
   def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = {
 val endOffsetMetadata = localLog.logEndOffsetMetadata
-val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
logStartOffset) {
-  new LogOffsetMetadata(logStartOffset)
+val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
_localLogStartOffset) {

Review Comment:
   > In the rare case, the restarted broker is elected as the leader before 
caught up through unclean election. Is this the case that you want to address?
   
   yes, we want to address this case too. And, the issue can also happen during 
clean preferred-leader-election:
   
   ```
   Call stack: The replica (1002) has full data but HW is invalid, then the 
fetch-offset will be equal to LeaderLog(1001).highWatermark
   
   Leader (1001):
   KafkaApis.handleFetchRequest
   ReplicaManager.fetchMessages
   ReplicaManager.readFromLocalLog
   Partition.fetchRecords
   Partition.updateFollowerFetchState
   Partition.maybeExpandIsr
   Partition.submitAlterPartition
   ...
   ...
   ...
   # If there is not enough data to respond and there is no remote 
data, we will let the fetch request wait for new data.
   # parks the request in the DelayedFetchPurgatory
   
   
   Another thread, runs Preferred-Leader-Election in controller (1003), since 
the replica 1002 joined the ISR list, it can be elected as the preferred 
leader. The controller sends LeaderAndIsr requests to all the brokers.
   
   KafkaController.processReplicaLeaderElection
   KafkaController.onReplicaElection
   PartitionStateMachine.handleStateChanges
   PartitionStateMachine.doHandleStateChanges
   PartitionStateMachine.electLeaderForPartitions
   ControllerChannelManager.sendRequestsToBrokers
   
   
   Replica 1002 got elected as Leader and have invalid highWatermark since it 
didn't processed the fetch-response from the previous leader 1001, throws 
OFFSET_OUT_OF_RANGE error when processing the LeaderAndIsr request. Note that 
if the LeaderAndIsr request fails to process for one partition, then the 
remaining partitions in that request won't be processed.
   
   KafkaApis.handleLeaderAndIsrRequest
   ReplicaManager.becomeLeaderOrFollower
   ReplicaManager.makeLeaders
   Partition.makeLeader
   Partition.maybeIncrementLeaderHW
   UnifiedLog.maybeIncrementHighWatermark (LeaderLog)
   UnifiedLog.fetchHighWatermarkMetadata
   
   
   The controller assumes that the current-leader for the tp0 is 1002, but the 
broker 1002 couldn't process the LISR. The controller retries the LISR until 
the broker becomes leader for tp0. During this time, the producers won't be 
able to send messages, as the node 1002, sends NOT_LEADER_FOR_PARTITION 
errorcode to the producer.
   
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]

2024-04-12 Thread via GitHub


junrao commented on code in PR #15634:
URL: https://github.com/apache/kafka/pull/15634#discussion_r1562991673


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
* Update high watermark with offset metadata. The new high watermark will 
be lower
-   * bounded by the log start offset and upper bounded by the log end offset.
+   * bounded by the local-log-start-offset and upper bounded by the 
log-end-offset.
*
* @param highWatermarkMetadata the suggested high watermark with offset 
metadata
* @return the updated high watermark offset
*/
   def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = {
 val endOffsetMetadata = localLog.logEndOffsetMetadata
-val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
logStartOffset) {
-  new LogOffsetMetadata(logStartOffset)
+val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
_localLogStartOffset) {

Review Comment:
   Yes, if replication-offset-checkpoint is corrupted, HWM could temporarily be 
set to below local-log-start-offset. I am still trying to understand the impact 
of that. In the common case, the restarted broker can't become the leader or 
serve reads until it's caught up. At that time, the HWM will be up to date. In 
the rare case, the restarted broker is elected as the leader before caught up 
through unclean election. Is this the case that you want to address?
   
   The jira also says:
   
   > If the high watermark is less than the local-log-start-offset, then the 
[UnifiedLog#fetchHighWatermarkMetadata](https://sourcegraph.com/github.com/apache/kafka@d4caa1c10ec81b9c87eaaf52b73c83d5579b68d3/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L358)
 method will throw the OFFSET_OUT_OF_RANGE error when it converts the offset to 
metadata. Once this error happens, the followers will receive out-of-range 
exceptions and the producers won't be able to produce messages since the leader 
cannot move the high watermark.
   
   However, the follower read is bounded by logEndOffset, not HWM? Where does 
the follower read need to convert HWM to metadata?



##
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##
@@ -318,6 +318,80 @@ class UnifiedLogTest {
 assertHighWatermark(4L)
   }
 
+  @Test
+  def testHighWatermarkMaintenanceForRemoteTopic(): Unit = {
+val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024, 
remoteLogStorageEnable = true)
+val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
+val leaderEpoch = 0
+
+def assertHighWatermark(offset: Long): Unit = {
+  assertEquals(offset, log.highWatermark)
+  assertValidLogOffsetMetadata(log, log.fetchOffsetSnapshot.highWatermark)
+}
+
+// High watermark initialized to 0
+assertHighWatermark(0L)
+
+var offset = 0L
+for(_ <- 0 until 50) {
+  val records = TestUtils.singletonRecords("test".getBytes())
+  val info = log.appendAsLeader(records, leaderEpoch)
+  offset = info.lastOffset
+  if (offset != 0 && offset % 10 == 0)
+log.roll()
+}
+assertEquals(5, log.logSegments.size)
+
+// High watermark not changed by append
+assertHighWatermark(0L)
+
+// Update high watermark as leader
+log.maybeIncrementHighWatermark(new LogOffsetMetadata(50L))
+assertHighWatermark(50L)
+assertEquals(50L, log.logEndOffset)
+
+// Cannot update high watermark past the log end offset
+log.updateHighWatermark(60L)
+assertHighWatermark(50L)
+
+// simulate calls to upload 3 segments to remote storage and remove them 
from local-log.
+log.updateHighestOffsetInRemoteStorage(30)
+log.maybeIncrementLocalLogStartOffset(31L, 
LogStartOffsetIncrementReason.SegmentDeletion)
+log.deleteOldSegments()
+assertEquals(2, log.logSegments.size)
+assertEquals(31L, log.localLogStartOffset())
+assertHighWatermark(50L)
+
+// simulate one remote-log segment deletion
+val logStartOffset = 11L
+log.maybeIncrementLogStartOffset(logStartOffset, 
LogStartOffsetIncrementReason.SegmentDeletion)
+assertEquals(11, log.logStartOffset)
+
+// Updating the HW below the log-start-offset / local-log-start-offset is 
not allowed. HW should reset to local-log-start-offset.
+log.updateHighWatermark(new LogOffsetMetadata(5L))
+assertHighWatermark(31L)
+// Updating the HW between log-start-offset and local-log-start-offset is 
not allowed. HW should reset to local-log-start-offset.

Review Comment:
   This is moving HW below local-log-start-offset, not log-start-offset.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact 

Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]

2024-04-11 Thread via GitHub


kamalcph commented on PR #15634:
URL: https://github.com/apache/kafka/pull/15634#issuecomment-2050934793

   @junrao @showuon @divijvaidya 
   
   Gentle bump to review the diff, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]

2024-04-09 Thread via GitHub


kamalcph commented on code in PR #15634:
URL: https://github.com/apache/kafka/pull/15634#discussion_r1557089094


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
* Update high watermark with offset metadata. The new high watermark will 
be lower
-   * bounded by the log start offset and upper bounded by the log end offset.
+   * bounded by the local-log-start-offset and upper bounded by the 
log-end-offset.
*
* @param highWatermarkMetadata the suggested high watermark with offset 
metadata
* @return the updated high watermark offset
*/
   def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = {
 val endOffsetMetadata = localLog.logEndOffsetMetadata
-val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
logStartOffset) {
-  new LogOffsetMetadata(logStartOffset)
+val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
_localLogStartOffset) {

Review Comment:
   > when will we set HWM to be lower than _localLogStartOffset?
   
   This can happen when recovering the partition due to ungraceful shutdown and 
the replication-offset-checkpoint file is missing/corrupted. When the broker 
comes online, HWM is set to to localLogStartOffset in 
[UnifiedLog#updateLocalLogStartOffset](https://sourcegraph.com/github.com/apache/kafka@f895ab5145077c5efa10a4a898628d901b01e2c2/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L162),
 then we load the HWM from the checkpoint file in 
[Partition#createLog](https://sourcegraph.com/github.com/apache/kafka@f895ab5145077c5efa10a4a898628d901b01e2c2/-/blob/core/src/main/scala/kafka/cluster/Partition.scala?L495).
   
   If the HWM checkpoint file is missing / does not contain the entry for 
partition, then the default value of 0 is taken. If 0 < LogStartOffset (LSO), 
then LSO is assumed as HWM . Thus, the non-monotonic update of highwatermark 
from LLSO to LSO can happen.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]

2024-04-08 Thread via GitHub


junrao commented on code in PR #15634:
URL: https://github.com/apache/kafka/pull/15634#discussion_r1556593959


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
* Update high watermark with offset metadata. The new high watermark will 
be lower
-   * bounded by the log start offset and upper bounded by the log end offset.
+   * bounded by the local-log-start-offset and upper bounded by the 
log-end-offset.
*
* @param highWatermarkMetadata the suggested high watermark with offset 
metadata
* @return the updated high watermark offset
*/
   def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = {
 val endOffsetMetadata = localLog.logEndOffsetMetadata
-val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
logStartOffset) {
-  new LogOffsetMetadata(logStartOffset)
+val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
_localLogStartOffset) {

Review Comment:
   Hmm, when will we set HWM to be lower than _localLogStartOffset?
   
   In `UnifiedLog.deletableSegments()`, we have the following code that bounds 
the retention based deletion by highWatermark. When updating highWatermark, the 
value typically increases.
   `val predicateResult = highWatermark >= upperBoundOffset && 
predicate(segment, nextSegmentOpt)
   `



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]

2024-04-01 Thread via GitHub


satishd commented on code in PR #15634:
URL: https://github.com/apache/kafka/pull/15634#discussion_r1546167963


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -136,16 +136,16 @@ class UnifiedLog(@volatile var logStartOffset: Long,
*/
   @volatile private var firstUnstableOffsetMetadata: Option[LogOffsetMetadata] 
= None
 
+  @volatile var partitionMetadataFile: Option[PartitionMetadataFile] = None

Review Comment:
   nit: You can leave it at the earliest place for this field as it is not 
really needed for this change. 



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -136,16 +136,16 @@ class UnifiedLog(@volatile var logStartOffset: Long,
*/
   @volatile private var firstUnstableOffsetMetadata: Option[LogOffsetMetadata] 
= None
 
+  @volatile var partitionMetadataFile: Option[PartitionMetadataFile] = None
+
+  @volatile private[kafka] var _localLogStartOffset: Long = logStartOffset
+
   /* Keep track of the current high watermark in order to ensure that segments 
containing offsets at or above it are
* not eligible for deletion. This means that the active segment is only 
eligible for deletion if the high watermark
* equals the log end offset (which may never happen for a partition under 
consistent load). This is needed to
* prevent the log start offset (which is exposed in fetch responses) from 
getting ahead of the high watermark.
*/
-  @volatile private var highWatermarkMetadata: LogOffsetMetadata = new 
LogOffsetMetadata(logStartOffset)
-
-  @volatile var partitionMetadataFile: Option[PartitionMetadataFile] = None
-
-  @volatile private[kafka] var _localLogStartOffset: Long = logStartOffset
+  @volatile private var highWatermarkMetadata: LogOffsetMetadata = new 
LogOffsetMetadata(_localLogStartOffset)

Review Comment:
   There won't be any effect with this change as `_localLogStartOffset` is 
initialized with `logStartOffset`. But it is good to keep 
`_localLogStartOffset` for consistency and relevance of this field. 



##
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##
@@ -318,6 +318,80 @@ class UnifiedLogTest {
 assertHighWatermark(4L)
   }
 
+  @Test
+  def testHighWatermarkMaintenanceForRemoteTopic(): Unit = {
+val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024, 
remoteLogStorageEnable = true)
+val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
+val leaderEpoch = 0
+
+def assertHighWatermark(offset: Long): Unit = {
+  assertEquals(offset, log.highWatermark)
+  assertValidLogOffsetMetadata(log, log.fetchOffsetSnapshot.highWatermark)
+}
+
+// High watermark initialized to 0
+assertHighWatermark(0L)
+
+var offset = 0L
+for(_ <- 0 until 50) {
+  val records = TestUtils.singletonRecords("test".getBytes())
+  val info = log.appendAsLeader(records, leaderEpoch)
+  offset = info.lastOffset
+  if (offset != 0 && offset % 10 == 0)
+log.roll()
+}
+assertEquals(5, log.logSegments.size)
+
+// High watermark not changed by append
+assertHighWatermark(0L)
+
+// Update high watermark as leader
+log.maybeIncrementHighWatermark(new LogOffsetMetadata(50L))
+assertHighWatermark(50L)
+assertEquals(50L, log.logEndOffset)
+
+// Cannot update high watermark past the log end offset
+log.updateHighWatermark(60L)
+assertHighWatermark(50L)
+
+// simulate calls to upload 3 segments to remote storage and remove them 
from local-log.
+log.updateHighestOffsetInRemoteStorage(30)
+log.maybeIncrementLocalLogStartOffset(31L, 
LogStartOffsetIncrementReason.SegmentDeletion)
+log.deleteOldSegments()
+assertEquals(2, log.logSegments.size)
+assertEquals(31L, log.localLogStartOffset())
+assertHighWatermark(50L)
+
+// simulate one remote-log segment deletion
+val logStartOffset = 11L
+log.maybeIncrementLogStartOffset(logStartOffset, 
LogStartOffsetIncrementReason.SegmentDeletion)
+assertEquals(11, log.logStartOffset)
+
+// Updating the HW below the log-start-offset / local-log-start-offset is 
not allowed. HW should reset to local-log-start-offset.
+log.updateHighWatermark(new LogOffsetMetadata(5L))
+assertHighWatermark(31L)
+// Updating the HW between log-start-offset and local-log-start-offset is 
not allowed. HW should reset to local-log-start-offset.
+log.updateHighWatermark(new LogOffsetMetadata(25L))
+assertHighWatermark(31L)
+// Updating the HW between local-log-start-offset and log-end-offset is 
allowed.
+log.updateHighWatermark(new LogOffsetMetadata(32L))
+assertHighWatermark(32L)
+assertEquals(11L, log.logStartOffset)
+assertEquals(31L, log.localLogStartOffset())
+
+// Truncating the logs to below the local-log-start-offset, should update 
the high watermark

Review Comment:
   Good to see covering the truncation scenarios also.  



-- 
This is an automated message from the 

Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]

2024-04-01 Thread via GitHub


kamalcph commented on PR #15634:
URL: https://github.com/apache/kafka/pull/15634#issuecomment-2029474481

   > just curious. Does it happens only if remote storage is enabled? According 
to the description:
   > 
   > > The follower sends the first FETCH request to the leader, the leader 
checks whether the isFollowerInSync, then expands the ISR. Also, parks the 
request in DelayedFetchPurgatory. If the replica was elected as leader before 
the fetch-response gets processed, then the new-leader will have wrong 
high-watermark.
   > 
   > It looks like the issue is existent even though we don't use remote 
storage.
   
   For normal topic, once the replica becomes leader. It is able to 
[resolve/convert](https://sourcegraph.com/github.com/apache/kafka@40e87ae35beb389d6419d32130174d7c68fa4d19/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala#L319)
 the highwatermark offset (log-start-offset) to metadata by reading the segment 
from disk and then it updates the high-watermark to either 
current-leader-log-end-offset (or) the lowest LEO of all the eligible-isr 
replicas. In case of remote topic, the replica will fail to 
[resolve](https://sourcegraph.com/github.com/apache/kafka@40e87ae35beb389d6419d32130174d7c68fa4d19/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala#L319)
 the highwatermark offset (log-start-offset) to metadata since the segment 
won't be in local-disk, and then fail continuously.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]

2024-04-01 Thread via GitHub


chia7712 commented on PR #15634:
URL: https://github.com/apache/kafka/pull/15634#issuecomment-2029310080

   just curious. Does it happens only if remote storage is enabled? According 
to the description:
   
   > The follower sends the first FETCH request to the leader, the leader 
checks whether the isFollowerInSync, then expands the ISR. Also, parks the 
request in DelayedFetchPurgatory. If the replica was elected as leader before 
the fetch-response gets processed, then the new-leader will have wrong 
high-watermark.
   
   It looks like the issue is existent even though we don't use remote storage.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org