[jira] [Commented] (KAFKA-16780) Txn consumer exerts pressure on remote storage when reading non-txn topic

2024-05-27 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17849695#comment-17849695
 ] 

Kamal Chandraprakash commented on KAFKA-16780:
--

The issue mentioned above also applies to the normal topics on which remote 
storage is not enabled. When the consumer is configured with READ_COMMITTED 
isolation and reads from the beginning of the partition, we scan all the 
transaction indexes to collect the aborted transactions (the indexes would be 
empty if the producer is not a transactional producer). This can add delay to 
respond to the FETCH request when we have lot of segments/indexes to scan.

[~jolshan] [~chia7712] [~showuon] 

Could you please suggest an approach on how to proceed on this? Thanks!

> Txn consumer exerts pressure on remote storage when reading non-txn topic
> -
>
> Key: KAFKA-16780
> URL: https://issues.apache.org/jira/browse/KAFKA-16780
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Priority: Major
>
> h3. Logic to read aborted txns:
>  # When the consumer enables isolation_level as {{READ_COMMITTED}} and reads 
> a non-txn topic, then the broker has to 
> [traverse|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LocalLog.scala#L394]
>  all the local log segments to collect the aborted transactions since there 
> won't be any entry in the transaction index.
>  # The same 
> [logic|https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1436]
>  is applied while reading from remote storage. In this case, when the FETCH 
> request is reading data from the first remote log segment, then it has to 
> fetch the transaction indexes of all the remaining remote-log segments, and 
> then the call lands to the local-log segments before responding to the FETCH 
> request which increases the time taken to serve the requests.
> The [EoS Abort 
> Index|https://docs.google.com/document/d/1Rlqizmk7QCDe8qAnVW5e5X8rGvn6m2DCR3JR2yqwVjc]
>  design doc explains how the transaction index file filters out the aborted 
> transaction records.
> The issue is when consumers are enabled with the {{READ_COMMITTED}} isolation 
> level but read the non-txn topics. If the topic is enabled with the 
> transaction, then we expect the transaction to either commit/rollback within 
> 15 minutes (default transaction.max.timeout.ms = 15 mins), possibly we may 
> have to search only a few remote log segments to collect the aborted txns.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16414) Inconsistent active segment expiration behavior between retention.ms and retention.bytes

2024-05-20 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847876#comment-17847876
 ] 

Kamal Chandraprakash edited comment on KAFKA-16414 at 5/20/24 1:14 PM:
---

We may need a discussion before changing the behavior. One issue that can come 
is if the user sets the below configs:
{code:java}
segment.bytes = 1 GB
local.retention.bytes = 1 KB
{code}
Going by this JIRA, we will rotate the active segment and mark it eligible for 
local-log deletion. This can lead to huge number of smaller-size segments which 
impacts the RemoteLogMetadataManager as we have to maintain the metadata for 
all the segments. Impact on the RemoteStorageManager can be increase in the 
number of file descriptors etc.



was (Author: ckamal):
We may need a discussion before changing the behavior. One issue that can come 
is if the user sets the below configs:
{code:java}
segment.bytes = 1 GB
local.retention.bytes = 1 KB
{code}
Going by this JIRA, we will rotate the active segment and mark it eligible for 
local-log deletion. This can lead to huge number of smaller-size segments which 
impacts the RemoteLogMetadataManager as we have to maintain the metadata for 
all the segments. Impact on the Remote Storage Manager can be increase in the 
number of file descriptors etc.


> Inconsistent active segment expiration behavior between retention.ms and 
> retention.bytes
> 
>
> Key: KAFKA-16414
> URL: https://issues.apache.org/jira/browse/KAFKA-16414
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.6.1
>Reporter: Kuan Po Tseng
>Assignee: Kuan Po Tseng
>Priority: Major
>
> This is a follow up issue on KAFKA-16385.
> Currently, there's a difference between how retention.ms and retention.bytes 
> handle active segment expiration:
> - retention.ms always expire active segment when max segment timestamp 
> matches the condition.
> - retention.bytes only expire active segment when retention.bytes is 
> configured to zero.
> The behavior should be either rotate active segments for both retention 
> configurations or none at all.
> For more details, see
> https://issues.apache.org/jira/browse/KAFKA-16385?focusedCommentId=17829682=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17829682



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16414) Inconsistent active segment expiration behavior between retention.ms and retention.bytes

2024-05-20 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847876#comment-17847876
 ] 

Kamal Chandraprakash edited comment on KAFKA-16414 at 5/20/24 1:14 PM:
---

We may need a discussion before changing the behavior. One issue that can come 
is if the user sets the below configs:
{code:java}
segment.bytes = 1 GB
local.retention.bytes = 1 KB
{code}
Going by this JIRA, we will rotate the active segment and mark it eligible for 
local-log deletion. This can lead to huge number of smaller-size segments which 
impacts the RemoteLogMetadataManager as we have to maintain the metadata for 
all the segments. Impact on the Remote Storage Manager can be increase in the 
number of file descriptors etc.



was (Author: ckamal):
We may need a discussion before changing the behavior. One issue that can come 
is if the user sets the below configs:
{code:java}
segment.bytes = 1 GB
local.retention.bytes = 1 KB
{code}
Going by this JIRA, we will rotate the active segment and mark it eligible for 
local-log deletion. This can lead to huge number of smaller-size segments which 
impacts the RemoteLogMetadataManager as we have to maintain the metadata for 
all the segments.

> Inconsistent active segment expiration behavior between retention.ms and 
> retention.bytes
> 
>
> Key: KAFKA-16414
> URL: https://issues.apache.org/jira/browse/KAFKA-16414
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.6.1
>Reporter: Kuan Po Tseng
>Assignee: Kuan Po Tseng
>Priority: Major
>
> This is a follow up issue on KAFKA-16385.
> Currently, there's a difference between how retention.ms and retention.bytes 
> handle active segment expiration:
> - retention.ms always expire active segment when max segment timestamp 
> matches the condition.
> - retention.bytes only expire active segment when retention.bytes is 
> configured to zero.
> The behavior should be either rotate active segments for both retention 
> configurations or none at all.
> For more details, see
> https://issues.apache.org/jira/browse/KAFKA-16385?focusedCommentId=17829682=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17829682



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16414) Inconsistent active segment expiration behavior between retention.ms and retention.bytes

2024-05-20 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847876#comment-17847876
 ] 

Kamal Chandraprakash commented on KAFKA-16414:
--

We may need a discussion before changing the behavior. One issue that can come 
is if the user sets the below configs:
{code:java}
segment.bytes = 1 GB
local.retention.bytes = 1 KB
{code}
Going by this JIRA, we will rotate the active segment and mark it eligible for 
local-log deletion. This can lead to huge number of smaller-size segments which 
impacts the RemoteLogMetadataManager as we have to maintain the metadata for 
all the segments.

> Inconsistent active segment expiration behavior between retention.ms and 
> retention.bytes
> 
>
> Key: KAFKA-16414
> URL: https://issues.apache.org/jira/browse/KAFKA-16414
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.6.1
>Reporter: Kuan Po Tseng
>Assignee: Kuan Po Tseng
>Priority: Major
>
> This is a follow up issue on KAFKA-16385.
> Currently, there's a difference between how retention.ms and retention.bytes 
> handle active segment expiration:
> - retention.ms always expire active segment when max segment timestamp 
> matches the condition.
> - retention.bytes only expire active segment when retention.bytes is 
> configured to zero.
> The behavior should be either rotate active segments for both retention 
> configurations or none at all.
> For more details, see
> https://issues.apache.org/jira/browse/KAFKA-16385?focusedCommentId=17829682=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17829682



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16385) Segment is rolled before segment.ms or segment.bytes breached

2024-05-20 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847873#comment-17847873
 ] 

Kamal Chandraprakash commented on KAFKA-16385:
--

Adding the comment for posterity:

> OK, so, even if we adopt the option 2, we still cannot guarantee all the data 
> expire the 1 day limit will get deleted. Let's say, Right before the 
> retention thread starting to check, a new record arrived. Then, in this case, 
> this segment won't be eligible for expiration even though it contains data 
> over 1 day. And it still breaks the contract of the retention.ms.

If segment.ms is configured to be 1 day, then all the segments regardless of 
active/stale gets rotated once a day and is eligible for deletion by the log 
cleaner thread. The deletion may not be exact, worst case {{deletion time = 
retention.ms + segment.ms}}

> Segment is rolled before segment.ms or segment.bytes breached
> -
>
> Key: KAFKA-16385
> URL: https://issues.apache.org/jira/browse/KAFKA-16385
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.5.1, 3.7.0
>Reporter: Luke Chen
>Assignee: Kuan Po Tseng
>Priority: Major
> Fix For: 3.8.0
>
>
> Steps to reproduce:
> 0. Startup a broker with `log.retention.check.interval.ms=1000` to speed up 
> the test.
> 1. Creating a topic with the config: segment.ms=7days , segment.bytes=1GB, 
> retention.ms=1sec .
> 2. Send a record "aaa" to the topic
> 3. Wait for 1 second
> Will this segment will rolled? I thought no.
> But what I have tested is it will roll:
> {code:java}
> [2024-03-19 15:23:13,924] INFO [LocalLog partition=t2-1, 
> dir=/tmp/kafka-logs_jbod] Rolled new log segment at offset 1 in 3 ms. 
> (kafka.log.LocalLog)
> [2024-03-19 15:23:13,925] INFO [ProducerStateManager partition=t2-1] Wrote 
> producer snapshot at offset 1 with 1 producer ids in 1 ms. 
> (org.apache.kafka.storage.internals.log.ProducerStateManager)
> [2024-03-19 15:23:13,925] INFO [UnifiedLog partition=t2-1, 
> dir=/tmp/kafka-logs_jbod] Deleting segment LogSegment(baseOffset=0, size=71, 
> lastModifiedTime=1710832993131, largestRecordTimestamp=1710832992125) due to 
> log retention time 1000ms breach based on the largest record timestamp in the 
> segment (kafka.log.UnifiedLog)
> {code}
> The segment is rolled due to log retention time 1000ms breached, which is 
> unexpected.
> Tested in v3.5.1, it has the same issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16780) Txn consumer exerts pressure on remote storage when reading non-txn topic

2024-05-16 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-16780:
-
Description: 
h3. Logic to read aborted txns:
 # When the consumer enables isolation_level as {{READ_COMMITTED}} and reads a 
non-txn topic, then the broker has to 
[traverse|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LocalLog.scala#L394]
 all the local log segments to collect the aborted transactions since there 
won't be any entry in the transaction index.
 # The same 
[logic|https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1436]
 is applied while reading from remote storage. In this case, when the FETCH 
request is reading data from the first remote log segment, then it has to fetch 
the transaction indexes of all the remaining remote-log segments, and then the 
call lands to the local-log segments before responding to the FETCH request 
which increases the time taken to serve the requests.

The [EoS Abort 
Index|https://docs.google.com/document/d/1Rlqizmk7QCDe8qAnVW5e5X8rGvn6m2DCR3JR2yqwVjc]
 design doc explains how the transaction index file filters out the aborted 
transaction records.

The issue is when consumers are enabled with the {{READ_COMMITTED}} isolation 
level but read the non-txn topics. If the topic is enabled with the 
transaction, then we expect the transaction to either commit/rollback within 15 
minutes (default transaction.max.timeout.ms = 15 mins), possibly we may have to 
search only a few remote log segments to collect the aborted txns.

  was:
h3. Logic to read aborted txns:
 # When the consumer enables isolation_level as {{READ_COMMITTED}} and reads a 
non-txn topic, then the broker has to 
[traverse|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LocalLog.scala#L394]
 all the local log segments to collect the aborted transactions since there 
won't be any entry in the transaction index.
 # The same 
[logic|https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1436]
 is applied while reading from remote storage. In this case, when the FETCH 
request is reading data from the first remote log segment, then it has to fetch 
the transaction indexes of all the remaining remote-log segments, and then the 
call lands to the local-log segments before responding to the FETCH request 
which increases the time taken to serve the requests.

The [EoS Abort 
Index|https://docs.google.com/document/d/1Rlqizmk7QCDe8qAnVW5e5X8rGvn6m2DCR3JR2yqwVjc]
 design doc explains how the transaction index file filters out the aborted 
transaction records.

The issue is when consumers are enabled with the {{READ_COMMITTED}} isolation 
level but read the normal topics. If the topic is enabled with the transaction, 
then we expect the transaction to either commit/rollback within 15 minutes 
(default transaction.max.timeout.ms = 15 mins), possibly we may have to search 
only a few remote log segments to collect the aborted txns.


> Txn consumer exerts pressure on remote storage when reading non-txn topic
> -
>
> Key: KAFKA-16780
> URL: https://issues.apache.org/jira/browse/KAFKA-16780
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Priority: Major
>
> h3. Logic to read aborted txns:
>  # When the consumer enables isolation_level as {{READ_COMMITTED}} and reads 
> a non-txn topic, then the broker has to 
> [traverse|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LocalLog.scala#L394]
>  all the local log segments to collect the aborted transactions since there 
> won't be any entry in the transaction index.
>  # The same 
> [logic|https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1436]
>  is applied while reading from remote storage. In this case, when the FETCH 
> request is reading data from the first remote log segment, then it has to 
> fetch the transaction indexes of all the remaining remote-log segments, and 
> then the call lands to the local-log segments before responding to the FETCH 
> request which increases the time taken to serve the requests.
> The [EoS Abort 
> Index|https://docs.google.com/document/d/1Rlqizmk7QCDe8qAnVW5e5X8rGvn6m2DCR3JR2yqwVjc]
>  design doc explains how the transaction index file filters out the aborted 
> transaction records.
> The issue is when consumers are enabled with the {{READ_COMMITTED}} isolation 
> level but read the non-txn topics. If the topic is enabled with the 
> transaction, then we expect the transaction to either commit/rollback within 
> 15 minutes (default transaction.max.timeout.ms = 15 mins), possibly we may 
> have to search only a few remote 

[jira] [Updated] (KAFKA-16780) Txn consumer exerts pressure on remote storage when reading non-txn topic

2024-05-16 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-16780:
-
Description: 
h3. Logic to read aborted txns:
 # When the consumer enables isolation_level as {{READ_COMMITTED}} and reads a 
non-txn topic, then the broker has to 
[traverse|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LocalLog.scala#L394]
 all the local log segments to collect the aborted transactions since there 
won't be any entry in the transaction index.
 # The same 
[logic|https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1436]
 is applied while reading from remote storage. In this case, when the FETCH 
request is reading data from the first remote log segment, then it has to fetch 
the transaction indexes of all the remaining remote-log segments, and then the 
call lands to the local-log segments before responding to the FETCH request 
which increases the time taken to serve the requests.

The [EoS Abort 
Index|https://docs.google.com/document/d/1Rlqizmk7QCDe8qAnVW5e5X8rGvn6m2DCR3JR2yqwVjc]
 design doc explains how the transaction index file filters out the aborted 
transaction records.

The issue is when consumers are enabled with the {{READ_COMMITTED}} isolation 
level but read the normal topics. If the topic is enabled with the transaction, 
then we expect the transaction to either commit/rollback within 15 minutes 
(default transaction.max.timeout.ms = 15 mins), possibly we may have to search 
only a few remote log segments to collect the aborted txns.

  was:
h3. Logic to read aborted txns:
 # When the consumer enables isolation_level as {{READ_COMMITTED}} and reads a 
non-txn topic, then the broker has to 
[traverse|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LocalLog.scala#L394]
 all the local log segments to collect the aborted transactions since there 
won't be any entry in the transaction index.
 # The same 
[logic|https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1436]
 is applied while reading from remote storage. In this case, when the FETCH 
request is reading data from the first remote log segment, then it has to fetch 
the transaction indexes of all the remaining remote-log segments, and then the 
call lands to the local-log segments before responding to the FETCH request 
which increases the time taken to serve the requests.

The [EoS Abort 
Index|https://docs.google.com/document/d/1Rlqizmk7QCDe8qAnVW5e5X8rGvn6m2DCR3JR2yqwVjc]
 design doc explains how the transaction index file filters out the aborted 
transaction records.

The issue is when consumers are enabled with the {{READ_COMMITTED}} isolation 
level but read the normal topics. If the topic is enabled with the transaction, 
then we expect the transaction to either commit/rollback within 15 minutes 
(default transaction.max.timeout.ms = 15 mins), possibly we may have to search 
only few remote log segments to collect the aborted txns.


> Txn consumer exerts pressure on remote storage when reading non-txn topic
> -
>
> Key: KAFKA-16780
> URL: https://issues.apache.org/jira/browse/KAFKA-16780
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Priority: Major
>
> h3. Logic to read aborted txns:
>  # When the consumer enables isolation_level as {{READ_COMMITTED}} and reads 
> a non-txn topic, then the broker has to 
> [traverse|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LocalLog.scala#L394]
>  all the local log segments to collect the aborted transactions since there 
> won't be any entry in the transaction index.
>  # The same 
> [logic|https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1436]
>  is applied while reading from remote storage. In this case, when the FETCH 
> request is reading data from the first remote log segment, then it has to 
> fetch the transaction indexes of all the remaining remote-log segments, and 
> then the call lands to the local-log segments before responding to the FETCH 
> request which increases the time taken to serve the requests.
> The [EoS Abort 
> Index|https://docs.google.com/document/d/1Rlqizmk7QCDe8qAnVW5e5X8rGvn6m2DCR3JR2yqwVjc]
>  design doc explains how the transaction index file filters out the aborted 
> transaction records.
> The issue is when consumers are enabled with the {{READ_COMMITTED}} isolation 
> level but read the normal topics. If the topic is enabled with the 
> transaction, then we expect the transaction to either commit/rollback within 
> 15 minutes (default transaction.max.timeout.ms = 15 mins), possibly we may 
> have to search only a few remote log 

[jira] [Updated] (KAFKA-16780) Txn consumer exerts pressure on remote storage when reading non-txn topic

2024-05-16 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-16780:
-
Description: 
h3. Logic to read aborted txns:
 # When the consumer enables isolation_level as {{READ_COMMITTED}} and reads a 
non-txn topic, then the broker has to 
[traverse|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LocalLog.scala#L394]
 all the local log segments to collect the aborted transactions since there 
won't be any entry in the transaction index.
 # The same 
[logic|https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1436]
 is applied while reading from remote storage. In this case, when the FETCH 
request is reading data from the first remote log segment, then it has to fetch 
the transaction indexes of all the remaining remote-log segments, and then the 
call lands to the local-log segments before responding to the FETCH request 
which increases the time taken to serve the requests.

The [EoS Abort 
Index|https://docs.google.com/document/d/1Rlqizmk7QCDe8qAnVW5e5X8rGvn6m2DCR3JR2yqwVjc]
 design doc explains how the transaction index file filters out the aborted 
transaction records.

The issue is when consumers are enabled with the {{READ_COMMITTED}} isolation 
level but read the normal topics. If the topic is enabled with the transaction, 
then we expect the transaction to either commit/rollback within 15 minutes 
(default transaction.max.timeout.ms = 15 mins), possibly we may have to search 
only few remote log segments to collect the aborted txns.

  was:
h3. Logic to read aborted txns:
 # When the consumer enables isolation_level as {{READ_COMMITTED}} and reads a 
non-txn topic, then the broker has to 
[traverse|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LocalLog.scala#L394]
 all the local log segments to collect the aborted transactions since there 
won't be any entry in the transaction index.
 # The same 
[logic|https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1436]
 is applied while reading from remote storage. In this case, when the FETCH 
request is reading data from the first remote log segment, then it has to fetch 
the transaction indexes of all the remaining remote-log segments, and then the 
call lands to the local-log segments before responding to the FETCH request 
which increases the time taken to serve the requests.

The [EoS Abort 
Index|https://docs.google.com/document/d/1Rlqizmk7QCDe8qAnVW5e5X8rGvn6m2DCR3JR2yqwVjc]
 design doc explains how the transaction index file filters out the aborted 
transaction records.

The issue is when consumers are enabled with the {{READ_COMMITTED}} isolation 
level but read the normal topics. If the topic is enabled with the transaction, 
then we expect the transaction to either commit/rollback within 15 minutes 
(default transaction.max.timeout.ms = 15 mins), possibly we may have to search 
only for one (or) two remote log segments.


> Txn consumer exerts pressure on remote storage when reading non-txn topic
> -
>
> Key: KAFKA-16780
> URL: https://issues.apache.org/jira/browse/KAFKA-16780
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Priority: Major
>
> h3. Logic to read aborted txns:
>  # When the consumer enables isolation_level as {{READ_COMMITTED}} and reads 
> a non-txn topic, then the broker has to 
> [traverse|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LocalLog.scala#L394]
>  all the local log segments to collect the aborted transactions since there 
> won't be any entry in the transaction index.
>  # The same 
> [logic|https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1436]
>  is applied while reading from remote storage. In this case, when the FETCH 
> request is reading data from the first remote log segment, then it has to 
> fetch the transaction indexes of all the remaining remote-log segments, and 
> then the call lands to the local-log segments before responding to the FETCH 
> request which increases the time taken to serve the requests.
> The [EoS Abort 
> Index|https://docs.google.com/document/d/1Rlqizmk7QCDe8qAnVW5e5X8rGvn6m2DCR3JR2yqwVjc]
>  design doc explains how the transaction index file filters out the aborted 
> transaction records.
> The issue is when consumers are enabled with the {{READ_COMMITTED}} isolation 
> level but read the normal topics. If the topic is enabled with the 
> transaction, then we expect the transaction to either commit/rollback within 
> 15 minutes (default transaction.max.timeout.ms = 15 mins), possibly we may 
> have to search only few remote log segments to collect 

[jira] [Created] (KAFKA-16780) Txn consumer exerts pressure on remote storage when reading non-txn topic

2024-05-16 Thread Kamal Chandraprakash (Jira)
Kamal Chandraprakash created KAFKA-16780:


 Summary: Txn consumer exerts pressure on remote storage when 
reading non-txn topic
 Key: KAFKA-16780
 URL: https://issues.apache.org/jira/browse/KAFKA-16780
 Project: Kafka
  Issue Type: Task
Reporter: Kamal Chandraprakash


h3. Logic to read aborted txns:
 # When the consumer enables isolation_level as {{READ_COMMITTED}} and reads a 
non-txn topic, then the broker has to 
[traverse|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LocalLog.scala#L394]
 all the local log segments to collect the aborted transactions since there 
won't be any entry in the transaction index.
 # The same 
[logic|https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1436]
 is applied while reading from remote storage. In this case, when the FETCH 
request is reading data from the first remote log segment, then it has to fetch 
the transaction indexes of all the remaining remote-log segments, and then the 
call lands to the local-log segments before responding to the FETCH request 
which increases the time taken to serve the requests.

The [EoS Abort 
Index|https://docs.google.com/document/d/1Rlqizmk7QCDe8qAnVW5e5X8rGvn6m2DCR3JR2yqwVjc]
 design doc explains how the transaction index file filters out the aborted 
transaction records.

The issue is when consumers are enabled with the {{READ_COMMITTED}} isolation 
level but read the normal topics. If the topic is enabled with the transaction, 
then we expect the transaction to either commit/rollback within 15 minutes 
(default transaction.max.timeout.ms = 15 mins), possibly we may have to search 
only for one (or) two remote log segments.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16696) Remove the in-memory implementation of RSM and RLMM

2024-05-09 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-16696:
-
Description: 
The in-memory implementation of RSM and RLMM were written to write the 
unit/integration tests: [https://github.com/apache/kafka/pull/10218]

This is not used by any of the tests and superseded by the LocalTieredStorage 
framework which uses local-disk as secondary storage and topic as RLMM. Using 
the LocalTieredStorage framework is the preferred way to write the integration 
tests to capture any regression as it uses the internal topic as storage for 
RLMM which is the default implementation. 

  was:
The in-memory implementation of RSM and RLMM were written to write the 
unit/integration tests: [https://github.com/apache/kafka/pull/10218]

This is not used by any of the tests and superseded by the LocalTieredStorage 
framework which uses local-disk as secondary storage and topic as RLMM. Using 
the LocalTieredStorage framework is the preferred way to write the integration 
tests to capture any regression as it uses the official topic as storage for 
RLMM.


> Remove the in-memory implementation of RSM and RLMM
> ---
>
> Key: KAFKA-16696
> URL: https://issues.apache.org/jira/browse/KAFKA-16696
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Minor
>
> The in-memory implementation of RSM and RLMM were written to write the 
> unit/integration tests: [https://github.com/apache/kafka/pull/10218]
> This is not used by any of the tests and superseded by the LocalTieredStorage 
> framework which uses local-disk as secondary storage and topic as RLMM. Using 
> the LocalTieredStorage framework is the preferred way to write the 
> integration tests to capture any regression as it uses the internal topic as 
> storage for RLMM which is the default implementation. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16696) Remove the in-memory implementation of RSM and RLMM

2024-05-09 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-16696:
-
Description: 
The in-memory implementation of RSM and RLMM were written to write the 
unit/integration tests: [https://github.com/apache/kafka/pull/10218]

This is not used by any of the tests and superseded by the LocalTieredStorage 
framework which uses local-disk as secondary storage and topic as RLMM. Using 
the LocalTieredStorage framework is the preferred way to write the integration 
tests to capture any regression as it uses the official topic as storage for 
RLMM.

  was:
The in-memory implementation of RSM and RLMM were written to write the 
unit/integration tests: [https://github.com/apache/kafka/pull/10218]

This is not used by any of the tests and superseded by the LocalTieredStorage 
framework which uses local-disk as secondary storage and topic as RLMM.


> Remove the in-memory implementation of RSM and RLMM
> ---
>
> Key: KAFKA-16696
> URL: https://issues.apache.org/jira/browse/KAFKA-16696
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Minor
>
> The in-memory implementation of RSM and RLMM were written to write the 
> unit/integration tests: [https://github.com/apache/kafka/pull/10218]
> This is not used by any of the tests and superseded by the LocalTieredStorage 
> framework which uses local-disk as secondary storage and topic as RLMM. Using 
> the LocalTieredStorage framework is the preferred way to write the 
> integration tests to capture any regression as it uses the official topic as 
> storage for RLMM.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16696) Remove the in-memory implementation of RSM and RLMM

2024-05-09 Thread Kamal Chandraprakash (Jira)
Kamal Chandraprakash created KAFKA-16696:


 Summary: Remove the in-memory implementation of RSM and RLMM
 Key: KAFKA-16696
 URL: https://issues.apache.org/jira/browse/KAFKA-16696
 Project: Kafka
  Issue Type: Task
Reporter: Kamal Chandraprakash
Assignee: Kamal Chandraprakash


The in-memory implementation of RSM and RLMM were written to write the 
unit/integration tests: [https://github.com/apache/kafka/pull/10218]

This is not used by any of the tests and superseded by the LocalTieredStorage 
framework which uses local-disk as secondary storage and topic as RLMM.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16511) Leaking tiered segments

2024-05-03 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17843185#comment-17843185
 ] 

Kamal Chandraprakash commented on KAFKA-16511:
--

[~showuon] 

Your analysis is correct. I tried handling this case in the PR. 

> Leaking tiered segments
> ---
>
> Key: KAFKA-16511
> URL: https://issues.apache.org/jira/browse/KAFKA-16511
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.7.0
>Reporter: Francois Visconte
>Assignee: Kamal Chandraprakash
>Priority: Major
>  Labels: tiered-storage
>
> I have some topics there were not written since a few days (having 12h 
> retention) where some data remains on tiered storage (in our case S3) and 
> they are never deleted.
>  
> Looking at the log history, it appears that we never even tried to delete 
> these segments:
> When looking at one of the non-leaking segment, I get the following 
> interesting messages:
>  
> {code:java}
> "2024-04-02T10:30:45.265Z","""kafka""","""10039""","[RemoteLogManager=10039 
> partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764] Deleted remote log segment 
> RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ} due to leader-epoch-cache truncation. Current 
> earliest-epoch-entry: EpochEntry(epoch=8, startOffset=2980106), 
> segment-end-offset: 2976819 and segment-epochs: [5]"
> "2024-04-02T10:30:45.242Z","""kafka""","""10039""","Deleting log segment data 
> for completed successfully 
> RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013411147, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_FINISHED}"
> "2024-04-02T10:30:45.144Z","""kafka""","""10039""","Deleting log segment data 
> for RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013411147, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_FINISHED}"
> "2024-04-01T23:16:51.157Z","""kafka""","""10029""","[RemoteLogManager=10029 
> partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764] Copied 
> 02968418.log to remote storage with segment-id: 
> RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}"
> "2024-04-01T23:16:51.147Z","""kafka""","""10029""","Copying log segment data 
> completed successfully, metadata: 
> RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013397319, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}"
> "2024-04-01T23:16:37.328Z","""kafka""","""10029""","Copying log segment data, 
> metadata: RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013397319, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}"
> {code}
>  
> Which looks right because we can see logs from both the plugin and remote log 
> manager indicating that the remote log segment was removed.
> Now if I look on one of the leaked segment, here is what I see
>  
> {code:java}
> "2024-04-02T00:43:33.834Z","""kafka""","""10001""","[RemoteLogManager=10001 
> partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765] Copied 
> 02971163.log to remote storage with segment-id: 
> RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765, 
> id=8dP13VDYSaiFlubl9SNBTQ}"
> "2024-04-02T00:43:33.822Z","""kafka""","""10001""","Copying log segment data 
> completed successfully, metadata: 
> RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765, 
> id=8dP13VDYSaiFlubl9SNBTQ}
> , startOffset=2971163, endOffset=2978396, brokerId=10001, 
> maxTimestampMs=1712010648756, eventTimestampMs=1712018599981, 
> segmentLeaderEpochs={7=2971163}, segmentSizeInBytes=459778940, 
> customMetadata=Optional.empty, 

[jira] [Updated] (KAFKA-16605) Fix the flaky LogCleanerParameterizedIntegrationTest

2024-04-23 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-16605:
-
Description: 
https://ge.apache.org/scans/tests?search.relativeStartTime=P7D=kafka=Asia%2FCalcutta=kafka.log.LogCleanerParameterizedIntegrationTest=testCleansCombinedCompactAndDeleteTopic(CompressionType)%5B1%5D

> Fix the flaky LogCleanerParameterizedIntegrationTest
> 
>
> Key: KAFKA-16605
> URL: https://issues.apache.org/jira/browse/KAFKA-16605
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Major
>
> https://ge.apache.org/scans/tests?search.relativeStartTime=P7D=kafka=Asia%2FCalcutta=kafka.log.LogCleanerParameterizedIntegrationTest=testCleansCombinedCompactAndDeleteTopic(CompressionType)%5B1%5D



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16605) Fix the flaky LogCleanerParameterizedIntegrationTest

2024-04-23 Thread Kamal Chandraprakash (Jira)
Kamal Chandraprakash created KAFKA-16605:


 Summary: Fix the flaky LogCleanerParameterizedIntegrationTest
 Key: KAFKA-16605
 URL: https://issues.apache.org/jira/browse/KAFKA-16605
 Project: Kafka
  Issue Type: Task
Reporter: Kamal Chandraprakash
Assignee: Kamal Chandraprakash






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15480) Add RemoteStorageInterruptedException

2024-04-13 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15480?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-15480:
-
Labels: kip  (was: )

> Add RemoteStorageInterruptedException
> -
>
> Key: KAFKA-15480
> URL: https://issues.apache.org/jira/browse/KAFKA-15480
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Affects Versions: 3.6.0
>Reporter: Mital Awachat
>Priority: Major
>  Labels: kip
> Fix For: 3.8.0
>
>
> Introduce `RemoteStorageInterruptedException` to propagate interruptions from 
> the plugin to Kafka without generated (false) errors. 
> It allows the plugin to notify Kafka an API operation in progress was 
> interrupted as a result of task cancellation, which can happen under changes 
> such as leadership migration or topic deletion.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15776) Update delay timeout for DelayedRemoteFetch request

2024-04-13 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-15776:
-
Labels: kip  (was: )

> Update delay timeout for DelayedRemoteFetch request
> ---
>
> Key: KAFKA-15776
> URL: https://issues.apache.org/jira/browse/KAFKA-15776
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Major
>  Labels: kip
>
> We are reusing the {{fetch.max.wait.ms}} config as a delay timeout for 
> DelayedRemoteFetchPurgatory. {{fetch.max.wait.ms}} purpose is to wait for the 
> given amount of time when there is no data available to serve the FETCH 
> request.
> {code:java}
> The maximum amount of time the server will block before answering the fetch 
> request if there isn't sufficient data to immediately satisfy the requirement 
> given by fetch.min.bytes.
> {code}
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala#L41]
> Using the same timeout in the DelayedRemoteFetchPurgatory can confuse the 
> user on how to configure optimal value for each purpose. Moreover, the config 
> is of *LOW* importance and most of the users won't configure it and use the 
> default value of 500 ms.
> Having the delay timeout of 500 ms in DelayedRemoteFetchPurgatory can lead to 
> higher number of expired delayed remote fetch requests when the remote 
> storage have any degradation.
> We should introduce one {{fetch.remote.max.wait.ms}} config (preferably 
> server config) to define the delay timeout for DelayedRemoteFetch requests 
> (or) take it from client similar to {{request.timeout.ms}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15776) Update delay timeout for DelayedRemoteFetch request

2024-04-13 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-15776:
-
Labels: kip-1018  (was: kip)

> Update delay timeout for DelayedRemoteFetch request
> ---
>
> Key: KAFKA-15776
> URL: https://issues.apache.org/jira/browse/KAFKA-15776
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Major
>  Labels: kip-1018
>
> We are reusing the {{fetch.max.wait.ms}} config as a delay timeout for 
> DelayedRemoteFetchPurgatory. {{fetch.max.wait.ms}} purpose is to wait for the 
> given amount of time when there is no data available to serve the FETCH 
> request.
> {code:java}
> The maximum amount of time the server will block before answering the fetch 
> request if there isn't sufficient data to immediately satisfy the requirement 
> given by fetch.min.bytes.
> {code}
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala#L41]
> Using the same timeout in the DelayedRemoteFetchPurgatory can confuse the 
> user on how to configure optimal value for each purpose. Moreover, the config 
> is of *LOW* importance and most of the users won't configure it and use the 
> default value of 500 ms.
> Having the delay timeout of 500 ms in DelayedRemoteFetchPurgatory can lead to 
> higher number of expired delayed remote fetch requests when the remote 
> storage have any degradation.
> We should introduce one {{fetch.remote.max.wait.ms}} config (preferably 
> server config) to define the delay timeout for DelayedRemoteFetch requests 
> (or) take it from client similar to {{request.timeout.ms}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15341) Enabling TS for a topic during rolling restart causes problems

2024-04-13 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15341?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash reassigned KAFKA-15341:


Assignee: Kamal Chandraprakash

> Enabling TS for a topic during rolling restart causes problems
> --
>
> Key: KAFKA-15341
> URL: https://issues.apache.org/jira/browse/KAFKA-15341
> Project: Kafka
>  Issue Type: Bug
>Reporter: Divij Vaidya
>Assignee: Kamal Chandraprakash
>Priority: Major
>  Labels: KIP-405
> Fix For: 3.8.0
>
>
> When we are in a rolling restart to enable TS at system level, some brokers 
> have TS enabled on them and some don't. We send an alter config call to 
> enable TS for a topic, it hits a broker which has TS enabled, this broker 
> forwards it to the controller and controller will send the config update to 
> all brokers. When another broker which doesn't have TS enabled (because it 
> hasn't undergone the restart yet) gets this config change, it "should" fail 
> to apply it. But failing now is too late since alterConfig has already 
> succeeded since controller->broker config propagation is done async.
> With this JIRA, we want to have controller check if TS is enabled on all 
> brokers before applying alter config to turn on TS for a topic.
> Context: https://github.com/apache/kafka/pull/14176#discussion_r1291265129



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15341) Enabling TS for a topic during rolling restart causes problems

2024-04-13 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15341?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash reassigned KAFKA-15341:


Assignee: (was: Kamal Chandraprakash)

> Enabling TS for a topic during rolling restart causes problems
> --
>
> Key: KAFKA-15341
> URL: https://issues.apache.org/jira/browse/KAFKA-15341
> Project: Kafka
>  Issue Type: Bug
>Reporter: Divij Vaidya
>Priority: Major
>  Labels: KIP-405
> Fix For: 3.8.0
>
>
> When we are in a rolling restart to enable TS at system level, some brokers 
> have TS enabled on them and some don't. We send an alter config call to 
> enable TS for a topic, it hits a broker which has TS enabled, this broker 
> forwards it to the controller and controller will send the config update to 
> all brokers. When another broker which doesn't have TS enabled (because it 
> hasn't undergone the restart yet) gets this config change, it "should" fail 
> to apply it. But failing now is too late since alterConfig has already 
> succeeded since controller->broker config propagation is done async.
> With this JIRA, we want to have controller check if TS is enabled on all 
> brokers before applying alter config to turn on TS for a topic.
> Context: https://github.com/apache/kafka/pull/14176#discussion_r1291265129



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15682) Ensure internal remote log metadata topic does not expire its segments before deleting user-topic segments

2024-04-13 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash reassigned KAFKA-15682:


Assignee: Kamal Chandraprakash

> Ensure internal remote log metadata topic does not expire its segments before 
> deleting user-topic segments
> --
>
> Key: KAFKA-15682
> URL: https://issues.apache.org/jira/browse/KAFKA-15682
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Major
>
> One of the implementation of RemoteLogMetadataManager is 
> TopicBasedRemoteLogMetadataManager which uses an internal Kafka topic 
> {{__remote_log_metadata}} to store the metadata about the remote log 
> segments. Unlike other internal topics which are compaction enabled, this 
> topic is not enabled with compaction and retention is set to unlimited. 
> Keeping this internal topic retention to unlimited is not practical in real 
> world use-case where the topic local disk usage footprint grow huge over a 
> period of time. 
> It is assumed that the user will set the retention to a reasonable time such 
> that it is the max of all the user-created topics (max + X). We can't just 
> rely on the assumption and need an assertion to ensure that the internal 
> {{__remote_log_metadata}} segments are not eligible for deletion before the 
> expiry of all the relevant user-topic uploaded remote-log-segments , 
> otherwise there will be dangling remote-log-segments which won't be cleared 
> once all the brokers are restarted post the internal topic retention cleanup.
> See the discussion thread: 
> https://github.com/apache/kafka/pull/14576#discussion_r1368576126



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15682) Ensure internal remote log metadata topic does not expire its segments before deleting user-topic segments

2024-04-13 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash reassigned KAFKA-15682:


Assignee: (was: Kamal Chandraprakash)

> Ensure internal remote log metadata topic does not expire its segments before 
> deleting user-topic segments
> --
>
> Key: KAFKA-15682
> URL: https://issues.apache.org/jira/browse/KAFKA-15682
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Priority: Major
>
> One of the implementation of RemoteLogMetadataManager is 
> TopicBasedRemoteLogMetadataManager which uses an internal Kafka topic 
> {{__remote_log_metadata}} to store the metadata about the remote log 
> segments. Unlike other internal topics which are compaction enabled, this 
> topic is not enabled with compaction and retention is set to unlimited. 
> Keeping this internal topic retention to unlimited is not practical in real 
> world use-case where the topic local disk usage footprint grow huge over a 
> period of time. 
> It is assumed that the user will set the retention to a reasonable time such 
> that it is the max of all the user-created topics (max + X). We can't just 
> rely on the assumption and need an assertion to ensure that the internal 
> {{__remote_log_metadata}} segments are not eligible for deletion before the 
> expiry of all the relevant user-topic uploaded remote-log-segments , 
> otherwise there will be dangling remote-log-segments which won't be cleared 
> once all the brokers are restarted post the internal topic retention cleanup.
> See the discussion thread: 
> https://github.com/apache/kafka/pull/14576#discussion_r1368576126



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-9578) Kafka Tiered Storage - System Tests

2024-04-13 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash reassigned KAFKA-9578:
---

Assignee: (was: Kamal Chandraprakash)

> Kafka Tiered Storage - System  Tests
> 
>
> Key: KAFKA-9578
> URL: https://issues.apache.org/jira/browse/KAFKA-9578
> Project: Kafka
>  Issue Type: Test
>Reporter: Harsha
>Priority: Major
> Fix For: 3.8.0
>
>
> Initial test cases set up by [~Ying Zheng] 
>  
> [https://docs.google.com/spreadsheets/d/1gS0s1FOmcjpKYXBddejXAoJAjEZ7AdEzMU9wZc-JgY8/edit#gid=0]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-13560) Load indexes and data in async manner in the critical path of replica fetcher threads.

2024-04-13 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash reassigned KAFKA-13560:


Assignee: (was: Kamal Chandraprakash)

> Load indexes and data in async manner in the critical path of replica fetcher 
> threads. 
> ---
>
> Key: KAFKA-13560
> URL: https://issues.apache.org/jira/browse/KAFKA-13560
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Reporter: Satish Duggana
>Priority: Major
> Fix For: 3.8.0
>
>
> https://github.com/apache/kafka/pull/11390#discussion_r762366976
> https://github.com/apache/kafka/pull/11390#discussion_r1033141283



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16511) Leaking tiered segments

2024-04-13 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash reassigned KAFKA-16511:


Assignee: Kamal Chandraprakash

> Leaking tiered segments
> ---
>
> Key: KAFKA-16511
> URL: https://issues.apache.org/jira/browse/KAFKA-16511
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.7.0
>Reporter: Francois Visconte
>Assignee: Kamal Chandraprakash
>Priority: Major
>  Labels: tiered-storage
>
> I have some topics there were not written since a few days (having 12h 
> retention) where some data remains on tiered storage (in our case S3) and 
> they are never deleted.
>  
> Looking at the log history, it appears that we never even tried to delete 
> these segments:
> When looking at one of the non-leaking segment, I get the following 
> interesting messages:
>  
> {code:java}
> "2024-04-02T10:30:45.265Z","""kafka""","""10039""","[RemoteLogManager=10039 
> partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764] Deleted remote log segment 
> RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ} due to leader-epoch-cache truncation. Current 
> earliest-epoch-entry: EpochEntry(epoch=8, startOffset=2980106), 
> segment-end-offset: 2976819 and segment-epochs: [5]"
> "2024-04-02T10:30:45.242Z","""kafka""","""10039""","Deleting log segment data 
> for completed successfully 
> RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013411147, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_FINISHED}"
> "2024-04-02T10:30:45.144Z","""kafka""","""10039""","Deleting log segment data 
> for RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013411147, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_FINISHED}"
> "2024-04-01T23:16:51.157Z","""kafka""","""10029""","[RemoteLogManager=10029 
> partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764] Copied 
> 02968418.log to remote storage with segment-id: 
> RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}"
> "2024-04-01T23:16:51.147Z","""kafka""","""10029""","Copying log segment data 
> completed successfully, metadata: 
> RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013397319, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}"
> "2024-04-01T23:16:37.328Z","""kafka""","""10029""","Copying log segment data, 
> metadata: RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013397319, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}"
> {code}
>  
> Which looks right because we can see logs from both the plugin and remote log 
> manager indicating that the remote log segment was removed.
> Now if I look on one of the leaked segment, here is what I see
>  
> {code:java}
> "2024-04-02T00:43:33.834Z","""kafka""","""10001""","[RemoteLogManager=10001 
> partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765] Copied 
> 02971163.log to remote storage with segment-id: 
> RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765, 
> id=8dP13VDYSaiFlubl9SNBTQ}"
> "2024-04-02T00:43:33.822Z","""kafka""","""10001""","Copying log segment data 
> completed successfully, metadata: 
> RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765, 
> id=8dP13VDYSaiFlubl9SNBTQ}
> , startOffset=2971163, endOffset=2978396, brokerId=10001, 
> maxTimestampMs=1712010648756, eventTimestampMs=1712018599981, 
> segmentLeaderEpochs={7=2971163}, segmentSizeInBytes=459778940, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}"
> 

[jira] [Assigned] (KAFKA-14915) Option to consume multiple partitions that have their data in remote storage for the target offsets.

2024-04-13 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash reassigned KAFKA-14915:


Assignee: (was: Kamal Chandraprakash)

> Option to consume multiple partitions that have their data in remote storage 
> for the target offsets.
> 
>
> Key: KAFKA-14915
> URL: https://issues.apache.org/jira/browse/KAFKA-14915
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Satish Duggana
>Priority: Major
>  Labels: tiered-storage
> Fix For: 3.8.0
>
>
> Context: https://github.com/apache/kafka/pull/13535#discussion_r1171250580



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16511) Leaking tiered segments

2024-04-12 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-16511:
-
Description: 
I have some topics there were not written since a few days (having 12h 
retention) where some data remains on tiered storage (in our case S3) and they 
are never deleted.

 

Looking at the log history, it appears that we never even tried to delete these 
segments:

When looking at one of the non-leaking segment, I get the following interesting 
messages:

 
{code:java}
"2024-04-02T10:30:45.265Z","""kafka""","""10039""","[RemoteLogManager=10039 
partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764] Deleted remote log segment 
RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
id=fqGng3UURCG3-v4lETeLKQ} due to leader-epoch-cache truncation. Current 
earliest-epoch-entry: EpochEntry(epoch=8, startOffset=2980106), 
segment-end-offset: 2976819 and segment-epochs: [5]"
"2024-04-02T10:30:45.242Z","""kafka""","""10039""","Deleting log segment data 
for completed successfully 
RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
id=fqGng3UURCG3-v4lETeLKQ}
, startOffset=2968418, endOffset=2976819, brokerId=10029, 
maxTimestampMs=1712009754536, eventTimestampMs=1712013411147, 
segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
customMetadata=Optional.empty, state=COPY_SEGMENT_FINISHED}"
"2024-04-02T10:30:45.144Z","""kafka""","""10039""","Deleting log segment data 
for RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
id=fqGng3UURCG3-v4lETeLKQ}
, startOffset=2968418, endOffset=2976819, brokerId=10029, 
maxTimestampMs=1712009754536, eventTimestampMs=1712013411147, 
segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
customMetadata=Optional.empty, state=COPY_SEGMENT_FINISHED}"
"2024-04-01T23:16:51.157Z","""kafka""","""10029""","[RemoteLogManager=10029 
partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764] Copied 
02968418.log to remote storage with segment-id: 
RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
id=fqGng3UURCG3-v4lETeLKQ}"
"2024-04-01T23:16:51.147Z","""kafka""","""10029""","Copying log segment data 
completed successfully, metadata: 
RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
id=fqGng3UURCG3-v4lETeLKQ}
, startOffset=2968418, endOffset=2976819, brokerId=10029, 
maxTimestampMs=1712009754536, eventTimestampMs=1712013397319, 
segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}"
"2024-04-01T23:16:37.328Z","""kafka""","""10029""","Copying log segment data, 
metadata: RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
id=fqGng3UURCG3-v4lETeLKQ}
, startOffset=2968418, endOffset=2976819, brokerId=10029, 
maxTimestampMs=1712009754536, eventTimestampMs=1712013397319, 
segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}"
{code}
 

Which looks right because we can see logs from both the plugin and remote log 
manager indicating that the remote log segment was removed.

Now if I look on one of the leaked segment, here is what I see

 
{code:java}
"2024-04-02T00:43:33.834Z","""kafka""","""10001""","[RemoteLogManager=10001 
partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765] Copied 
02971163.log to remote storage with segment-id: 
RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765, 
id=8dP13VDYSaiFlubl9SNBTQ}"
"2024-04-02T00:43:33.822Z","""kafka""","""10001""","Copying log segment data 
completed successfully, metadata: 
RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765, 
id=8dP13VDYSaiFlubl9SNBTQ}
, startOffset=2971163, endOffset=2978396, brokerId=10001, 
maxTimestampMs=1712010648756, eventTimestampMs=1712018599981, 
segmentLeaderEpochs={7=2971163}, segmentSizeInBytes=459778940, 
customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}"
"2024-04-02T00:43:20.003Z","""kafka""","""10001""","Copying log segment data, 
metadata: RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765, 
id=8dP13VDYSaiFlubl9SNBTQ}
, startOffset=2971163, endOffset=2978396, brokerId=10001, 
maxTimestampMs=1712010648756, eventTimestampMs=1712018599981, 
segmentLeaderEpochs={7=2971163}, segmentSizeInBytes=459778940, 
customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}"
 
{code}
I have no errors whatsoever indicating that the remote log deletion was 
actually triggered and failed. 

I tried rolling restarting my cluster to see if refreshing 

[jira] [Commented] (KAFKA-16511) Leaking tiered segments

2024-04-12 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836762#comment-17836762
 ] 

Kamal Chandraprakash commented on KAFKA-16511:
--

[~fvisconte] 
The issue might be due to the overlapping remote log segments after a new 
leader gets elected during rolling restart. Would you please upload the past 10 
segments remote-log-segment metadata events for 
5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765 partition? Thanks!

> Leaking tiered segments
> ---
>
> Key: KAFKA-16511
> URL: https://issues.apache.org/jira/browse/KAFKA-16511
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.7.0
>Reporter: Francois Visconte
>Priority: Major
>  Labels: tiered-storage
>
> I have some topics there were not written since a few days (having 12h 
> retention) where some data remains on tiered storage (in our case S3) and 
> they are never deleted.
>  
> Looking at the log history, it appears that we never even tried to delete 
> these segments:
> When looking at one of the non-leaking segment, I get the following 
> interesting messages:
>  
> {code:java}
> "2024-04-02T10:30:45.265Z","""kafka""","""10039""","[RemoteLogManager=10039 
> partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764] Deleted remote log segment 
> RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ} due to leader-epoch-cache truncation. Current 
> earliest-epoch-entry: EpochEntry(epoch=8, startOffset=2980106), 
> segment-end-offset: 2976819 and segment-epochs: [5]"
> "2024-04-02T10:30:45.242Z","""kafka""","""10039""","Deleting log segment data 
> for completed successfully 
> RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013411147, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_FINISHED}"
> "2024-04-02T10:30:45.144Z","""kafka""","""10039""","Deleting log segment data 
> for RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013411147, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_FINISHED}"
> "2024-04-01T23:16:51.157Z","""kafka""","""10029""","[RemoteLogManager=10029 
> partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764] Copied 
> 02968418.log to remote storage with segment-id: 
> RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}"
> "2024-04-01T23:16:51.147Z","""kafka""","""10029""","Copying log segment data 
> completed successfully, metadata: 
> RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013397319, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}"
> "2024-04-01T23:16:37.328Z","""kafka""","""10029""","Copying log segment data, 
> metadata: RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013397319, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}"
> {code}
>  
> Which looks right because we can see logs from both the plugin and remote log 
> manager indicating that the remote log segment was removed.
> Now if I look on one of the leaked segment, here is what I see
>  
> {code:java}
> "2024-04-02T00:43:33.834Z","""kafka""","""10001""","[RemoteLogManager=10001 
> partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765] Copied 
> 02971163.log to remote storage with segment-id: 
> RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765, 
> id=8dP13VDYSaiFlubl9SNBTQ}"
> "2024-04-02T00:43:33.822Z","""kafka""","""10001""","Copying log segment data 
> completed successfully, metadata: 
> RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765, 
> id=8dP13VDYSaiFlubl9SNBTQ}
> , startOffset=2971163, endOffset=2978396, brokerId=10001, 
> 

[jira] [Commented] (KAFKA-16511) Leaking tiered segments

2024-04-12 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836444#comment-17836444
 ] 

Kamal Chandraprakash commented on KAFKA-16511:
--

The segment deletion might be stuck due to 
[RemoteLogManager#isRemoteSegmentWithinLeaderEpochs|https://sourcegraph.com/github.com/apache/kafka@trunk/-/blob/core/src/main/java/kafka/log/remote/RemoteLogManager.java?L1241]
 check. The {{log-start-offset}} for this partition 765 might be moved using 
the {{kafka-delete-records.sh}} script so the check fails to mark it as valid 
segment.

> Leaking tiered segments
> ---
>
> Key: KAFKA-16511
> URL: https://issues.apache.org/jira/browse/KAFKA-16511
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.7.0
>Reporter: Francois Visconte
>Priority: Major
>  Labels: tiered-storage
>
> I have some topics there were not written since a few days (having 12h 
> retention) where some data remains on tiered storage (in our case S3) and 
> they are never deleted.
>  
> Looking at the log history, it appears that we never even tried to delete 
> these segments:
> When looking at one of the non-leaking segment, I get the following 
> interesting messages:
>  
> {code:java}
> "2024-04-02T10:30:45.265Z","""kafka""","""10039""","[RemoteLogManager=10039 
> partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764] Deleted remote log segment 
> RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ} due to leader-epoch-cache truncation. Current 
> earliest-epoch-entry: EpochEntry(epoch=8, startOffset=2980106), 
> segment-end-offset: 2976819 and segment-epochs: [5]"
> "2024-04-02T10:30:45.242Z","""kafka""","""10039""","Deleting log segment data 
> for completed successfully 
> RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013411147, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_FINISHED}"
> "2024-04-02T10:30:45.144Z","""kafka""","""10039""","Deleting log segment data 
> for RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013411147, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_FINISHED}"
> "2024-04-01T23:16:51.157Z","""kafka""","""10029""","[RemoteLogManager=10029 
> partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764] Copied 
> 02968418.log to remote storage with segment-id: 
> RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}"
> "2024-04-01T23:16:51.147Z","""kafka""","""10029""","Copying log segment data 
> completed successfully, metadata: 
> RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013397319, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}"
> "2024-04-01T23:16:37.328Z","""kafka""","""10029""","Copying log segment data, 
> metadata: RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013397319, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}"
> {code}
>  
> Which looks right because we can see logs from both the plugin and remote log 
> manager indicating that the remote log segment was removed.
> Now if I look on one of the leaked segment, here is what I see
>  
> {code:java}
> "2024-04-02T00:43:33.834Z","""kafka""","""10001""","[RemoteLogManager=10001 
> partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765] Copied 
> 02971163.log to remote storage with segment-id: 
> RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765, 
> id=8dP13VDYSaiFlubl9SNBTQ}"
> "2024-04-02T00:43:33.822Z","""kafka""","""10001""","Copying log segment data 
> completed successfully, metadata: 
> RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> 

[jira] [Commented] (KAFKA-16511) Leaking tiered segments

2024-04-11 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836289#comment-17836289
 ] 

Kamal Chandraprakash commented on KAFKA-16511:
--

Can you also paste the contents of leader-epoch-checkpoint file to see the 
leader transitions?

> Leaking tiered segments
> ---
>
> Key: KAFKA-16511
> URL: https://issues.apache.org/jira/browse/KAFKA-16511
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.7.0
>Reporter: Francois Visconte
>Priority: Major
>  Labels: tiered-storage
>
> I have some topics there were not written since a few days (having 12h 
> retention) where some data remains on tiered storage (in our case S3) and 
> they are never deleted.
>  
> Looking at the log history, it appears that we never even tried to delete 
> these segments:
> When looking at one of the non-leaking segment, I get the following 
> interesting messages:
>  
> {code:java}
> "2024-04-02T10:30:45.265Z","""kafka""","""10039""","[RemoteLogManager=10039 
> partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764] Deleted remote log segment 
> RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ} due to leader-epoch-cache truncation. Current 
> earliest-epoch-entry: EpochEntry(epoch=8, startOffset=2980106), 
> segment-end-offset: 2976819 and segment-epochs: [5]"
> "2024-04-02T10:30:45.242Z","""kafka""","""10039""","Deleting log segment data 
> for completed successfully 
> RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013411147, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_FINISHED}"
> "2024-04-02T10:30:45.144Z","""kafka""","""10039""","Deleting log segment data 
> for RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013411147, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_FINISHED}"
> "2024-04-01T23:16:51.157Z","""kafka""","""10029""","[RemoteLogManager=10029 
> partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764] Copied 
> 02968418.log to remote storage with segment-id: 
> RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}"
> "2024-04-01T23:16:51.147Z","""kafka""","""10029""","Copying log segment data 
> completed successfully, metadata: 
> RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013397319, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}"
> "2024-04-01T23:16:37.328Z","""kafka""","""10029""","Copying log segment data, 
> metadata: RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013397319, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}"
> {code}
>  
> Which looks right because we can see logs from both the plugin and remote log 
> manager indicating that the remote log segment was removed.
> Now if I look on one of the leaked segment, here is what I see
>  
> {code:java}
> "2024-04-02T00:43:33.834Z","""kafka""","""10001""","[RemoteLogManager=10001 
> partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765] Copied 
> 02971163.log to remote storage with segment-id: 
> RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765, 
> id=8dP13VDYSaiFlubl9SNBTQ}"
> "2024-04-02T00:43:33.822Z","""kafka""","""10001""","Copying log segment data 
> completed successfully, metadata: 
> RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765, 
> id=8dP13VDYSaiFlubl9SNBTQ}
> , startOffset=2971163, endOffset=2978396, brokerId=10001, 
> maxTimestampMs=1712010648756, eventTimestampMs=1712018599981, 
> segmentLeaderEpochs={7=2971163}, segmentSizeInBytes=459778940, 
> customMetadata=Optional.empty, 

[jira] [Commented] (KAFKA-16511) Leaking tiered segments

2024-04-11 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836277#comment-17836277
 ] 

Kamal Chandraprakash commented on KAFKA-16511:
--

[~fvisconte]

> We can see that despite the retention period being largely exceeded, there 
> are 2060 offsets that are never expiring.

Can you check the local log segments of the current leader for partition 765? 

> Leaking tiered segments
> ---
>
> Key: KAFKA-16511
> URL: https://issues.apache.org/jira/browse/KAFKA-16511
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.7.0
>Reporter: Francois Visconte
>Priority: Major
>  Labels: tiered-storage
>
> I have some topics there were not written since a few days (having 12h 
> retention) where some data remains on tiered storage (in our case S3) and 
> they are never deleted.
>  
> Looking at the log history, it appears that we never even tried to delete 
> these segments:
> When looking at one of the non-leaking segment, I get the following 
> interesting messages:
>  
> {code:java}
> "2024-04-02T10:30:45.265Z","""kafka""","""10039""","[RemoteLogManager=10039 
> partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764] Deleted remote log segment 
> RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ} due to leader-epoch-cache truncation. Current 
> earliest-epoch-entry: EpochEntry(epoch=8, startOffset=2980106), 
> segment-end-offset: 2976819 and segment-epochs: [5]"
> "2024-04-02T10:30:45.242Z","""kafka""","""10039""","Deleting log segment data 
> for completed successfully 
> RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013411147, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_FINISHED}"
> "2024-04-02T10:30:45.144Z","""kafka""","""10039""","Deleting log segment data 
> for RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013411147, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_FINISHED}"
> "2024-04-01T23:16:51.157Z","""kafka""","""10029""","[RemoteLogManager=10029 
> partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764] Copied 
> 02968418.log to remote storage with segment-id: 
> RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}"
> "2024-04-01T23:16:51.147Z","""kafka""","""10029""","Copying log segment data 
> completed successfully, metadata: 
> RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013397319, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}"
> "2024-04-01T23:16:37.328Z","""kafka""","""10029""","Copying log segment data, 
> metadata: RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013397319, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}"
> {code}
>  
> Which looks right because we can see logs from both the plugin and remote log 
> manager indicating that the remote log segment was removed.
> Now if I look on one of the leaked segment, here is what I see
>  
> {code:java}
> "2024-04-02T00:43:33.834Z","""kafka""","""10001""","[RemoteLogManager=10001 
> partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765] Copied 
> 02971163.log to remote storage with segment-id: 
> RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765, 
> id=8dP13VDYSaiFlubl9SNBTQ}"
> "2024-04-02T00:43:33.822Z","""kafka""","""10001""","Copying log segment data 
> completed successfully, metadata: 
> RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765, 
> id=8dP13VDYSaiFlubl9SNBTQ}
> , startOffset=2971163, endOffset=2978396, brokerId=10001, 
> maxTimestampMs=1712010648756, eventTimestampMs=1712018599981, 
> 

[jira] (KAFKA-16511) Leaking tiered segments

2024-04-11 Thread Kamal Chandraprakash (Jira)


[ https://issues.apache.org/jira/browse/KAFKA-16511 ]


Kamal Chandraprakash deleted comment on KAFKA-16511:
--

was (Author: ckamal):
[~fvisconte] 

>From logs, the issue is on partition-765 and segment 
>`02971163.log` . There can be dangling segments in the remote 
>storage if there are retries in uploading the segment. In this case, the 
>uploader thread uploaded the segment but failed to write the 
>COPY_SEGMENT_FINISHED event. If you switch the leader to a different replica, 
>then the dangling segment will be removed.


1. The earliest and latest offset for the partition 765 is same which matches 
the expected value (there were not written since a few days (having 12h 
retention). 
2. Was the same segment re-uploaded and deleted with different 
remote-log-segment-id? Can you check your logs?
3. Did you move the log-start-offset using the {{kafka-delete-records.sh}} 
script in middle of segment upload?


> Leaking tiered segments
> ---
>
> Key: KAFKA-16511
> URL: https://issues.apache.org/jira/browse/KAFKA-16511
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.7.0
>Reporter: Francois Visconte
>Priority: Major
>  Labels: tiered-storage
>
> I have some topics there were not written since a few days (having 12h 
> retention) where some data remains on tiered storage (in our case S3) and 
> they are never deleted.
>  
> Looking at the log history, it appears that we never even tried to delete 
> these segments:
> When looking at one of the non-leaking segment, I get the following 
> interesting messages:
>  
> {code:java}
> "2024-04-02T10:30:45.265Z","""kafka""","""10039""","[RemoteLogManager=10039 
> partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764] Deleted remote log segment 
> RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ} due to leader-epoch-cache truncation. Current 
> earliest-epoch-entry: EpochEntry(epoch=8, startOffset=2980106), 
> segment-end-offset: 2976819 and segment-epochs: [5]"
> "2024-04-02T10:30:45.242Z","""kafka""","""10039""","Deleting log segment data 
> for completed successfully 
> RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013411147, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_FINISHED}"
> "2024-04-02T10:30:45.144Z","""kafka""","""10039""","Deleting log segment data 
> for RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013411147, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_FINISHED}"
> "2024-04-01T23:16:51.157Z","""kafka""","""10029""","[RemoteLogManager=10029 
> partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764] Copied 
> 02968418.log to remote storage with segment-id: 
> RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}"
> "2024-04-01T23:16:51.147Z","""kafka""","""10029""","Copying log segment data 
> completed successfully, metadata: 
> RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013397319, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}"
> "2024-04-01T23:16:37.328Z","""kafka""","""10029""","Copying log segment data, 
> metadata: RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013397319, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}"
> {code}
>  
> Which looks right because we can see logs from both the plugin and remote log 
> manager indicating that the remote log segment was removed.
> Now if I look on one of the leaked segment, here is what I see
>  
> {code:java}
> "2024-04-02T00:43:33.834Z","""kafka""","""10001""","[RemoteLogManager=10001 
> partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765] Copied 
> 02971163.log to remote storage with 

[jira] [Commented] (KAFKA-16511) Leaking tiered segments

2024-04-11 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836276#comment-17836276
 ] 

Kamal Chandraprakash commented on KAFKA-16511:
--

[~fvisconte] 

>From logs, the issue is on partition-765 and segment 
>`02971163.log` . There can be dangling segments in the remote 
>storage if there are retries in uploading the segment. In this case, the 
>uploader thread uploaded the segment but failed to write the 
>COPY_SEGMENT_FINISHED event. If you switch the leader to a different replica, 
>then the dangling segment will be removed.


1. The earliest and latest offset for the partition 765 is same which matches 
the expected value (there were not written since a few days (having 12h 
retention). 
2. Was the same segment re-uploaded and deleted with different 
remote-log-segment-id? Can you check your logs?
3. Did you move the log-start-offset using the {{kafka-delete-records.sh}} 
script in middle of segment upload?


> Leaking tiered segments
> ---
>
> Key: KAFKA-16511
> URL: https://issues.apache.org/jira/browse/KAFKA-16511
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.7.0
>Reporter: Francois Visconte
>Priority: Major
>  Labels: tiered-storage
>
> I have some topics there were not written since a few days (having 12h 
> retention) where some data remains on tiered storage (in our case S3) and 
> they are never deleted.
>  
> Looking at the log history, it appears that we never even tried to delete 
> these segments:
> When looking at one of the non-leaking segment, I get the following 
> interesting messages:
>  
> {code:java}
> "2024-04-02T10:30:45.265Z","""kafka""","""10039""","[RemoteLogManager=10039 
> partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764] Deleted remote log segment 
> RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ} due to leader-epoch-cache truncation. Current 
> earliest-epoch-entry: EpochEntry(epoch=8, startOffset=2980106), 
> segment-end-offset: 2976819 and segment-epochs: [5]"
> "2024-04-02T10:30:45.242Z","""kafka""","""10039""","Deleting log segment data 
> for completed successfully 
> RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013411147, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_FINISHED}"
> "2024-04-02T10:30:45.144Z","""kafka""","""10039""","Deleting log segment data 
> for RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013411147, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_FINISHED}"
> "2024-04-01T23:16:51.157Z","""kafka""","""10029""","[RemoteLogManager=10029 
> partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764] Copied 
> 02968418.log to remote storage with segment-id: 
> RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}"
> "2024-04-01T23:16:51.147Z","""kafka""","""10029""","Copying log segment data 
> completed successfully, metadata: 
> RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013397319, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}"
> "2024-04-01T23:16:37.328Z","""kafka""","""10029""","Copying log segment data, 
> metadata: RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013397319, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}"
> {code}
>  
> Which looks right because we can see logs from both the plugin and remote log 
> manager indicating that the remote log segment was removed.
> Now if I look on one of the leaked segment, here is what I see
>  
> {code:java}
> "2024-04-02T00:43:33.834Z","""kafka""","""10001""","[RemoteLogManager=10001 
> partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765] 

[jira] [Resolved] (KAFKA-16456) Can't stop kafka debug logs

2024-04-09 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash resolved KAFKA-16456.
--
Resolution: Not A Problem

You can also dynamically change the broker loggers using the 
{{kafka-configs.sh}} script:
(eg)
{code}
sh kafka-configs.sh --bootstrap-server localhost:9092 --entity-type 
broker-loggers --entity-name  --add-config 
org.apache.kafka.clients.NetworkClient=INFO --alter
{code}

> Can't stop kafka debug logs
> ---
>
> Key: KAFKA-16456
> URL: https://issues.apache.org/jira/browse/KAFKA-16456
> Project: Kafka
>  Issue Type: Bug
>  Components: logging
>Affects Versions: 3.6.0
>Reporter: Rajan Choudhary
>Priority: Major
>
> I am getting kafka debug logs, which are flooding our logs. Sample below
>  
> {code:java}
> 09:50:38.073 [kafka-producer-network-thread | maximo-mp] DEBUG 
> org.apache.kafka.clients.NetworkClient - [Producer clientId=maximo-mp, 
> transactionalId=sqout-3664816744674374805414] Received API_VERSIONS response 
> from node 5 for request with header RequestHeader(apiKey=API_VERSIONS, 
> apiVersion=3, clientId=maximo-mp, correlationId=8): 
> ApiVersionsResponseData(errorCode=0, apiKeys=[ApiVersion(apiKey=0, 
> minVersion=0, maxVersion=9), ApiVersion(apiKey=1, minVersion=0, 
> maxVersion=13), ApiVersion(apiKey=2, minVersion=0, maxVersion=7), 
> ApiVersion(apiKey=3, minVersion=0, maxVersion=12), ApiVersion(apiKey=4, 
> minVersion=0, maxVersion=5), ApiVersion(apiKey=5, minVersion=0, 
> maxVersion=3), ApiVersion(apiKey=6, minVersion=0, maxVersion=7), 
> ApiVersion(apiKey=7, minVersion=0, maxVersion=3), ApiVersion(apiKey=8, 
> minVersion=0, maxVersion=8), ApiVersion(apiKey=9, minVersion=0, 
> maxVersion=8), ApiVersion(apiKey=10, minVersion=0, maxVersion=4), 
> ApiVersion(apiKey=11, minVersion=0, maxVersion=7), ApiVersion(apiKey=12, 
> minVersion=0, m...
> 09:50:38.073 [kafka-producer-network-thread | maximo-mp] DEBUG 
> org.apache.kafka.clients.NetworkClient - [Producer clientId=maximo-mp, 
> transactionalId=sqout-3664816744674374805414] Node 5 has finalized features 
> epoch: 1, finalized features: [], supported features: [], API versions: 
> (Produce(0): 0 to 9 [usable: 9], Fetch(1): 0 to 13 [usable: 12], 
> ListOffsets(2): 0 to 7 [usable: 6], Metadata(3): 0 to 12 [usable: 11], 
> LeaderAndIsr(4): 0 to 5 [usable: 5], StopReplica(5): 0 to 3 [usable: 3], 
> UpdateMetadata(6): 0 to 7 [usable: 7], ControlledShutdown(7): 0 to 3 [usable: 
> 3], OffsetCommit(8): 0 to 8 [usable: 8], OffsetFetch(9): 0 to 8 [usable: 7], 
> FindCoordinator(10): 0 to 4 [usable: 3], JoinGroup(11): 0 to 7 [usable: 7], 
> Heartbeat(12): 0 to 4 [usable: 4], LeaveGroup(13): 0 to 4 [usable: 4], 
> SyncGroup(14): 0 to 5 [usable: 5], DescribeGroups(15): 0 to 5 [usable: 5], 
> ListGroups(16): 0 to 4 [usable: 4], SaslHandshake(17): 0 to 1 [usable: 1], 
> ApiVersions(18): 0 to 3 [usable: 3], CreateTopics(19): 0 to 7 [usable: 7], 
> Del...
> 09:50:38.074 [kafka-producer-network-thread | maximo-mp] DEBUG 
> org.apache.kafka.clients.producer.internals.TransactionManager - [Producer 
> clientId=maximo-mp, transactionalId=sqout-3664816744674374805414] ProducerId 
> of partition sqout-0 set to 43458621 with epoch 0. Reinitialize sequence at 
> beginning.
> 09:50:38.074 [kafka-producer-network-thread | maximo-mp] DEBUG 
> org.apache.kafka.clients.producer.internals.RecordAccumulator - [Producer 
> clientId=maximo-mp, transactionalId=sqout-3664816744674374805414] Assigned 
> producerId 43458621 and producerEpoch 0 to batch with base sequence 0 being 
> sent to partition sqout-0
> 09:50:38.075 [kafka-producer-network-thread | maximo-mp] DEBUG 
> org.apache.kafka.clients.NetworkClient - [Producer clientId=maximo-mp, 
> transactionalId=sqout-3664816744674374805414] Sending PRODUCE request with 
> header RequestHeader(apiKey=PRODUCE, apiVersion=9, clientId=maximo-mp, 
> correlationId=9) and timeout 3 to node 5: 
> {acks=-1,timeout=3,partitionSizes=[sqout-0=4181]}
> 09:50:38.095 [kafka-producer-network-thread | maximo-mp] DEBUG 
> org.apache.kafka.clients.NetworkClient - [Producer clientId=maximo-mp, 
> transactionalId=sqout-3664816744674374805414] Received PRODUCE response from 
> node 5 for request with header RequestHeader(apiKey=PRODUCE, apiVersion=9, 
> clientId=maximo-mp, correlationId=9): 
> ProduceResponseData(responses=[TopicProduceResponse(name='sqout', 
> partitionResponses=[PartitionProduceResponse(index=0, errorCode=0, 
> baseOffset=796494, logAppendTimeMs=-1, logStartOffset=768203, 
> recordErrors=[], errorMessage=null)])], throttleTimeMs=0)
> 09:50:38.095 [kafka-producer-network-thread | maximo-mp] DEBUG 
> org.apache.kafka.clients.producer.internals.TransactionManager - [Producer 
> clientId=maximo-mp, transactionalId=sqout-3664816744674374805414] ProducerId: 
> 43458621; Set 

[jira] [Updated] (KAFKA-16454) Snapshot the state of remote log metadata for all the partitions

2024-04-01 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16454?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-16454:
-
Description: 
When restarting the broker, we are reading from the beginning of the  
{{__remote_log_metadata}} topic to reconstruct the state of remote log 
segments, instead we can snapshot the state of remote log segments under each 
partition directory. 

Previous work to snapshot the state of the remote log metadata are removed as 
the solution is incomplete:

https://github.com/apache/kafka/pull/15636

  was:
When restarting the broker, we are reading from the beginning of the  
{{__remote_log_metadata}} topic to reconstruct the state of remote log 
segments, instead we can snapshot the state of remote log segments under each 
partition directory. 

Previous work to snapshot the state of the remote log metadata are removed as 
the solution is not complete. 

https://github.com/apache/kafka/pull/15636


> Snapshot the state of remote log metadata for all the partitions
> 
>
> Key: KAFKA-16454
> URL: https://issues.apache.org/jira/browse/KAFKA-16454
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Priority: Major
>  Labels: tiered-storage
>
> When restarting the broker, we are reading from the beginning of the  
> {{__remote_log_metadata}} topic to reconstruct the state of remote log 
> segments, instead we can snapshot the state of remote log segments under each 
> partition directory. 
> Previous work to snapshot the state of the remote log metadata are removed as 
> the solution is incomplete:
> https://github.com/apache/kafka/pull/15636



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16454) Snapshot the state of remote log metadata for all the partitions

2024-04-01 Thread Kamal Chandraprakash (Jira)
Kamal Chandraprakash created KAFKA-16454:


 Summary: Snapshot the state of remote log metadata for all the 
partitions
 Key: KAFKA-16454
 URL: https://issues.apache.org/jira/browse/KAFKA-16454
 Project: Kafka
  Issue Type: Task
Reporter: Kamal Chandraprakash


When restarting the broker, we are reading from the beginning of the  
{{__remote_log_metadata}} topic to reconstruct the state of remote log 
segments, instead we can snapshot the state of remote log segments under each 
partition directory. 

Previous work to snapshot the state of the remote log metadata are removed as 
the solution is not complete. 

https://github.com/apache/kafka/pull/15636



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16452) Bound highwatermark offset to range b/w local-log-start-offset and log-end-offset

2024-03-31 Thread Kamal Chandraprakash (Jira)
Kamal Chandraprakash created KAFKA-16452:


 Summary: Bound highwatermark offset to range b/w 
local-log-start-offset and log-end-offset
 Key: KAFKA-16452
 URL: https://issues.apache.org/jira/browse/KAFKA-16452
 Project: Kafka
  Issue Type: Task
Reporter: Kamal Chandraprakash
Assignee: Kamal Chandraprakash


The high watermark should not go below the local-log-start offset. If the high 
watermark is less than the local-log-start-offset, then the 
[UnifiedLog#fetchHighWatermarkMetadata|https://sourcegraph.com/github.com/apache/kafka@d4caa1c10ec81b9c87eaaf52b73c83d5579b68d3/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L358]
 method will throw the OFFSET_OUT_OF_RANGE error when it converts the offset to 
metadata. Once this error happens, the followers will receive out-of-range 
exceptions and the producers won't be able to produce messages since the leader 
cannot move the high watermark.

This issue can happen when the partition undergoes recovery due to corruption 
in the checkpoint file and it gets elected as leader before it gets a chance to 
update the HW from the previous leader.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16414) Inconsistent active segment expiration behavior between retention.ms and retention.bytes

2024-03-31 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17832561#comment-17832561
 ] 

Kamal Chandraprakash edited comment on KAFKA-16414 at 3/31/24 9:25 AM:
---

My suggestion is to keep the existing behavior for {{retention.bytes}} config. 
Assume that the user configured the {{retention.bytes}} = 1 byte and default 
{{segment.bytes}} = 1 GB. And, there is one-record batch in the active segment 
with total-size = 5 KB. If we rotate the active segment and delete it, then all 
the data for that partition will be removed. Shouldn't we have to maintain the 
one byte of data which is the last batch? Handling this case will be tricky, as 
we have to split the active segment into two, keep only the segment with 
last-batch, and it will also exceed the user-configured retention bytes.

For {{{}retention.ms{}}}, the behavior is correct since all the records/batches 
in that segments are older than the user-configured retention time, so we mark 
the active segment as eligible for deletion.


was (Author: ckamal):
My suggestion is to keep the existing behavior for {{retention.bytes}} config. 
Assume that the user configured the {{retention.bytes}} = 1 byte and default 
{{segment.bytes}} = 1 GB. And, there is one record batch in the active segment 
with total-size = 5 KB. If we rotate the active segment and delete it, then all 
the data for that partition will be removed. Shouldn't we have to maintain the 
one byte of data which is the last batch? Handling this case will be tricky, as 
we have to split the active segment into two, keep only the segment with 
last-batch, and it will also exceed the user-configured retention bytes.

For {{{}retention.ms{}}}, the behavior is correct since all the records/batches 
in that segments are older than the user-configured retention time, so we mark 
the active segment as eligible for deletion.

> Inconsistent active segment expiration behavior between retention.ms and 
> retention.bytes
> 
>
> Key: KAFKA-16414
> URL: https://issues.apache.org/jira/browse/KAFKA-16414
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.6.1
>Reporter: Kuan Po Tseng
>Assignee: Kuan Po Tseng
>Priority: Major
>
> This is a follow up issue on KAFKA-16385.
> Currently, there's a difference between how retention.ms and retention.bytes 
> handle active segment expiration:
> - retention.ms always expire active segment when max segment timestamp 
> matches the condition.
> - retention.bytes only expire active segment when retention.bytes is 
> configured to zero.
> The behavior should be either rotate active segments for both retention 
> configurations or none at all.
> For more details, see
> https://issues.apache.org/jira/browse/KAFKA-16385?focusedCommentId=17829682=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17829682



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16414) Inconsistent active segment expiration behavior between retention.ms and retention.bytes

2024-03-31 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17832561#comment-17832561
 ] 

Kamal Chandraprakash edited comment on KAFKA-16414 at 3/31/24 9:25 AM:
---

My suggestion is to keep the existing behavior for {{retention.bytes}} config. 
Assume that the user configured the {{retention.bytes}} = 1 byte and default 
{{segment.bytes}} = 1 GB. And, there is one record batch in the active segment 
with total-size = 5 KB. If we rotate the active segment and delete it, then all 
the data for that partition will be removed. Shouldn't we have to maintain the 
one byte of data which is the last batch? Handling this case will be tricky, as 
we have to split the active segment into two, keep only the segment with 
last-batch, and it will also exceed the user-configured retention bytes.

For {{{}retention.ms{}}}, the behavior is correct since all the records/batches 
in that segments are older than the user-configured retention time, so we mark 
the active segment as eligible for deletion.


was (Author: ckamal):
My suggestion is to keep the existing behavior for {{retention.bytes}} config. 
Assume that the user configured the {{retention.bytes}} = 1 byte and default 
{{segment.bytes}} = 1 GB. And, there is one record batch in the active segment 
with total-size = 5 KB. If we rotate the active segment and delete it, then all 
the data for that partition will be removed. Shouldn't we have to maintain the 
one byte of data which is the last batch? Handling this case will be tricky, as 
we have to split the active segment into two. 

For {{{}retention.ms{}}}, the behavior is correct since all the records/batches 
in that segments are older than the user-configured retention time, so we mark 
the active segment as eligible for deletion.

> Inconsistent active segment expiration behavior between retention.ms and 
> retention.bytes
> 
>
> Key: KAFKA-16414
> URL: https://issues.apache.org/jira/browse/KAFKA-16414
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.6.1
>Reporter: Kuan Po Tseng
>Assignee: Kuan Po Tseng
>Priority: Major
>
> This is a follow up issue on KAFKA-16385.
> Currently, there's a difference between how retention.ms and retention.bytes 
> handle active segment expiration:
> - retention.ms always expire active segment when max segment timestamp 
> matches the condition.
> - retention.bytes only expire active segment when retention.bytes is 
> configured to zero.
> The behavior should be either rotate active segments for both retention 
> configurations or none at all.
> For more details, see
> https://issues.apache.org/jira/browse/KAFKA-16385?focusedCommentId=17829682=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17829682



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16414) Inconsistent active segment expiration behavior between retention.ms and retention.bytes

2024-03-31 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17832561#comment-17832561
 ] 

Kamal Chandraprakash commented on KAFKA-16414:
--

My suggestion is to keep the existing behavior for {{retention.bytes}} config. 
Assume that the user configured the {{retention.bytes}} = 1 byte and default 
{{segment.bytes}} = 1 GB. And, there is one record batch in the active segment 
with total-size = 5 KB. If we rotate the active segment and delete it, then all 
the data for that partition will be removed. Shouldn't we have to maintain the 
one byte of data which is the last batch? Handling this case will be tricky, as 
we have to split the active segment into two. 

For {{{}retention.ms{}}}, the behavior is correct since all the records/batches 
in that segments are older than the user-configured retention time, so we mark 
the active segment as eligible for deletion.

> Inconsistent active segment expiration behavior between retention.ms and 
> retention.bytes
> 
>
> Key: KAFKA-16414
> URL: https://issues.apache.org/jira/browse/KAFKA-16414
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.6.1
>Reporter: Kuan Po Tseng
>Assignee: Kuan Po Tseng
>Priority: Major
>
> This is a follow up issue on KAFKA-16385.
> Currently, there's a difference between how retention.ms and retention.bytes 
> handle active segment expiration:
> - retention.ms always expire active segment when max segment timestamp 
> matches the condition.
> - retention.bytes only expire active segment when retention.bytes is 
> configured to zero.
> The behavior should be either rotate active segments for both retention 
> configurations or none at all.
> For more details, see
> https://issues.apache.org/jira/browse/KAFKA-16385?focusedCommentId=17829682=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17829682



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16161) Avoid creating remote log metadata snapshot file in partition data directory.

2024-03-27 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16161?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831619#comment-17831619
 ] 

Kamal Chandraprakash commented on KAFKA-16161:
--

Not yet started. Will work on this task by this week.

> Avoid creating remote log metadata snapshot file in partition data directory.
> -
>
> Key: KAFKA-16161
> URL: https://issues.apache.org/jira/browse/KAFKA-16161
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Satish Duggana
>Assignee: Kamal Chandraprakash
>Priority: Major
>  Labels: KIP-405
>
> Avoid creating remote log metadata snapshot file in a partition data 
> directory. This can be added when the snapshots implementation related 
> functionality is enabled end to end. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16414) Inconsistent active segment expiration behavior between retention.ms and retention.bytes

2024-03-26 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831161#comment-17831161
 ] 

Kamal Chandraprakash commented on KAFKA-16414:
--

I noticed the behavior change of {{retention.ms}} and {{retention.bytes}} while 
working on tiered storage integration tests. The behavior suited our 
requirement for integration tests where we want to keep only the active segment 
locally and move all the passive segments to remote storage. 

If we update the behavior, then it will make most of the tiered storage tests 
flaky, as all the segments will be rotated and uploaded to remote. And, the 
local segment will be empty. We also have to update the tiered storage 
integration tests when changing the behavior of the {{retention.bytes}} config.

https://sourcegraph.com/github.com/apache/kafka@932647606504125e5c3ba0ae9470b4af335a0885/-/blob/storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java?L174

> Inconsistent active segment expiration behavior between retention.ms and 
> retention.bytes
> 
>
> Key: KAFKA-16414
> URL: https://issues.apache.org/jira/browse/KAFKA-16414
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.6.1
>Reporter: Kuan Po Tseng
>Assignee: Kuan Po Tseng
>Priority: Major
>
> This is a follow up issue on KAFKA-16385.
> Currently, there's a difference between how retention.ms and retention.bytes 
> handle active segment expiration:
> - retention.ms always expire active segment when max segment timestamp 
> matches the condition.
> - retention.bytes only expire active segment when retention.bytes is 
> configured to zero.
> The behavior should be either rotate active segments for both retention 
> configurations or none at all.
> For more details, see
> https://issues.apache.org/jira/browse/KAFKA-16385?focusedCommentId=17829682=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17829682



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15206) Flaky test RemoteIndexCacheTest.testClose()

2024-03-12 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash reassigned KAFKA-15206:


Assignee: Kamal Chandraprakash  (was: Lan Ding)

> Flaky test RemoteIndexCacheTest.testClose()
> ---
>
> Key: KAFKA-15206
> URL: https://issues.apache.org/jira/browse/KAFKA-15206
> Project: Kafka
>  Issue Type: Test
>Reporter: Divij Vaidya
>Assignee: Kamal Chandraprakash
>Priority: Minor
>  Labels: flaky-test
> Fix For: 3.8.0
>
>
> Test fails 2% of the time.
> [https://ge.apache.org/scans/tests?search.timeZoneId=Europe/Berlin=kafka.log.remote.RemoteIndexCacheTest=testClose()]
>  
> This test should be modified to test 
> assertTrue(cache.cleanerThread.isShutdownComplete) in a 
> TestUtils.waitUntilTrue condition which will catch the InterruptedException 
> and exit successfully on it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15776) Update delay timeout for DelayedRemoteFetch request

2024-01-29 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17812155#comment-17812155
 ] 

Kamal Chandraprakash commented on KAFKA-15776:
--

I've opened a KIP to add new `fetch.remote.max.wait.ms` dynamic config: 
[KIP-1018|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1018%3A+Introduce+max+remote+fetch+timeout+config+for+DelayedRemoteFetch+requests].
 Please post your feedback and suggestions on the mailing thread.

> Update delay timeout for DelayedRemoteFetch request
> ---
>
> Key: KAFKA-15776
> URL: https://issues.apache.org/jira/browse/KAFKA-15776
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Major
>
> We are reusing the {{fetch.max.wait.ms}} config as a delay timeout for 
> DelayedRemoteFetchPurgatory. {{fetch.max.wait.ms}} purpose is to wait for the 
> given amount of time when there is no data available to serve the FETCH 
> request.
> {code:java}
> The maximum amount of time the server will block before answering the fetch 
> request if there isn't sufficient data to immediately satisfy the requirement 
> given by fetch.min.bytes.
> {code}
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala#L41]
> Using the same timeout in the DelayedRemoteFetchPurgatory can confuse the 
> user on how to configure optimal value for each purpose. Moreover, the config 
> is of *LOW* importance and most of the users won't configure it and use the 
> default value of 500 ms.
> Having the delay timeout of 500 ms in DelayedRemoteFetchPurgatory can lead to 
> higher number of expired delayed remote fetch requests when the remote 
> storage have any degradation.
> We should introduce one {{fetch.remote.max.wait.ms}} config (preferably 
> server config) to define the delay timeout for DelayedRemoteFetch requests 
> (or) take it from client similar to {{request.timeout.ms}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15776) Update delay timeout for DelayedRemoteFetch request

2024-01-21 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17809117#comment-17809117
 ] 

Kamal Chandraprakash commented on KAFKA-15776:
--

> I think having to configure a very high fetch.max.wait defeat the purpose of 
> the KIP of not having to proceed adaptations on the consumer side.

Kindly elaborate on this. 

> Update delay timeout for DelayedRemoteFetch request
> ---
>
> Key: KAFKA-15776
> URL: https://issues.apache.org/jira/browse/KAFKA-15776
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Major
>
> We are reusing the {{fetch.max.wait.ms}} config as a delay timeout for 
> DelayedRemoteFetchPurgatory. {{fetch.max.wait.ms}} purpose is to wait for the 
> given amount of time when there is no data available to serve the FETCH 
> request.
> {code:java}
> The maximum amount of time the server will block before answering the fetch 
> request if there isn't sufficient data to immediately satisfy the requirement 
> given by fetch.min.bytes.
> {code}
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala#L41]
> Using the same timeout in the DelayedRemoteFetchPurgatory can confuse the 
> user on how to configure optimal value for each purpose. Moreover, the config 
> is of *LOW* importance and most of the users won't configure it and use the 
> default value of 500 ms.
> Having the delay timeout of 500 ms in DelayedRemoteFetchPurgatory can lead to 
> higher number of expired delayed remote fetch requests when the remote 
> storage have any degradation.
> We should introduce one {{fetch.remote.max.wait.ms}} config (preferably 
> server config) to define the delay timeout for DelayedRemoteFetch requests 
> (or) take it from client similar to {{request.timeout.ms}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15776) Update delay timeout for DelayedRemoteFetch request

2024-01-21 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17809116#comment-17809116
 ] 

Kamal Chandraprakash commented on KAFKA-15776:
--

[~fvisconte] 

We are cancelling the currently executing fetch 
[task|https://sourcegraph.com/github.com/apache/kafka@92a67e8571500a53cc864ba6df4cb9cfdac6a763/-/blob/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala?L86]
 when the timeout happens. When the remote storage degrades, then the consumer 
may not be able to make progress. I'll open a discussion thread to discuss on 
this.

One approach is not to cancel the currently executing remote fetch task and 
cache the result on the storage manager, so that the subsequent consumer FETCH 
request (for the same fetch-offset) can be served from the cache.

> Update delay timeout for DelayedRemoteFetch request
> ---
>
> Key: KAFKA-15776
> URL: https://issues.apache.org/jira/browse/KAFKA-15776
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Major
>
> We are reusing the {{fetch.max.wait.ms}} config as a delay timeout for 
> DelayedRemoteFetchPurgatory. {{fetch.max.wait.ms}} purpose is to wait for the 
> given amount of time when there is no data available to serve the FETCH 
> request.
> {code:java}
> The maximum amount of time the server will block before answering the fetch 
> request if there isn't sufficient data to immediately satisfy the requirement 
> given by fetch.min.bytes.
> {code}
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala#L41]
> Using the same timeout in the DelayedRemoteFetchPurgatory can confuse the 
> user on how to configure optimal value for each purpose. Moreover, the config 
> is of *LOW* importance and most of the users won't configure it and use the 
> default value of 500 ms.
> Having the delay timeout of 500 ms in DelayedRemoteFetchPurgatory can lead to 
> higher number of expired delayed remote fetch requests when the remote 
> storage have any degradation.
> We should introduce one {{fetch.remote.max.wait.ms}} config (preferably 
> server config) to define the delay timeout for DelayedRemoteFetch requests 
> (or) take it from client similar to {{request.timeout.ms}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16105) Reassignment of tiered topics is failing due to RemoteStorageException

2024-01-16 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17807397#comment-17807397
 ] 

Kamal Chandraprakash commented on KAFKA-16105:
--

[~anatolypopov] 

Could you write an integration test to simulate the error scenario? You can 
refer to some of the existing 
[tests|https://sourcegraph.com/github.com/apache/kafka@trunk/-/blob/storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseReassignReplicaTest.java].
 Thanks!

> Reassignment of tiered topics is failing due to RemoteStorageException
> --
>
> Key: KAFKA-16105
> URL: https://issues.apache.org/jira/browse/KAFKA-16105
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Reporter: Anatolii Popov
>Priority: Critical
>
> When partition reassignment is happening for a tiered topic in most of the 
> cases it's stuck with RemoteStorageException's on follower nodes saying that 
> it can not construct remote log auxilary state:
>  
> {code:java}
> [2024-01-09 08:34:02,899] ERROR [ReplicaFetcher replicaId=7, leaderId=6, 
> fetcherId=2] Error building remote log auxiliary state for test-24 
> (kafka.server.ReplicaFetcherThread)
>                                          
> org.apache.kafka.server.log.remote.storage.RemoteStorageException: Couldn't 
> build the state from remote store for partition: test-24, currentLeaderEpoch: 
> 8, leaderLocalLogStartOffset: 209, leaderLogStartOffset: 0, epoch: 0 as the 
> previous remote log segment metadata was not found
>                                                  at 
> kafka.server.ReplicaFetcherTierStateMachine.buildRemoteLogAuxState(ReplicaFetcherTierStateMachine.java:259)
>                                                  at 
> kafka.server.ReplicaFetcherTierStateMachine.start(ReplicaFetcherTierStateMachine.java:106)
>                                                  at 
> kafka.server.AbstractFetcherThread.handleOffsetsMovedToTieredStorage(AbstractFetcherThread.scala:762)
>                                                  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:413)
>                                                  at 
> scala.Option.foreach(Option.scala:437)
>                                                  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:332)
>                                                  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:331)
>                                                  at 
> kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)
>                                                  at 
> scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:407)
>                                                  at 
> scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:403)
>                                                  at 
> scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:321)
>                                                  at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:331)
>                                                  at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:130)
>                                                  at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:129)
>                                                  at 
> scala.Option.foreach(Option.scala:437)
>                                                  at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129)
>                                                  at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112)
>                                                  at 
> kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
>                                                  at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:130)
>  {code}
>  
> Scenario:
> A cluster of 3 nodes with a single topic with 30 partitions. All partitions 
> have tiered segments.
> Adding 3 more nodes to the cluster and making a reassignment to move all the 
> data to new nodes.
> Behavior:
> For most of the partitions reassignment is happening smoothly.
> For some of the partitions when a new node starts to get assignments it reads 
> __remote_log_metadata topic and tries to initialize the metadata cache on 
> records with 

[jira] [Updated] (KAFKA-16146) Checkpoint log-start-offset for remote log enabled topics

2024-01-16 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-16146:
-
Description: 
The log-start-offset is not getting flushed to the checkpoint due to the below 
check:

[https://sourcegraph.com/github.com/apache/kafka@b16df3b103d915d33670b8156217fc6c2b473f61/-/blob/core/src/main/scala/kafka/log/LogManager.scala?L851]

  was:
The log-start-offset checkpoint is not getting updated after deleting the 
remote log segments due to the below check:

https://sourcegraph.com/github.com/apache/kafka@b16df3b103d915d33670b8156217fc6c2b473f61/-/blob/core/src/main/scala/kafka/log/LogManager.scala?L851


> Checkpoint log-start-offset for remote log enabled topics
> -
>
> Key: KAFKA-16146
> URL: https://issues.apache.org/jira/browse/KAFKA-16146
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Major
>
> The log-start-offset is not getting flushed to the checkpoint due to the 
> below check:
> [https://sourcegraph.com/github.com/apache/kafka@b16df3b103d915d33670b8156217fc6c2b473f61/-/blob/core/src/main/scala/kafka/log/LogManager.scala?L851]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16146) Checkpoint log-start-offset for remote log enabled topics

2024-01-16 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-16146:
-
Summary: Checkpoint log-start-offset for remote log enabled topics  (was: 
Checkpoint log-start-offset after remote log deletion)

> Checkpoint log-start-offset for remote log enabled topics
> -
>
> Key: KAFKA-16146
> URL: https://issues.apache.org/jira/browse/KAFKA-16146
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Major
>
> The log-start-offset checkpoint is not getting updated after deleting the 
> remote log segments due to the below check:
> https://sourcegraph.com/github.com/apache/kafka@b16df3b103d915d33670b8156217fc6c2b473f61/-/blob/core/src/main/scala/kafka/log/LogManager.scala?L851



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16146) Checkpoint log-start-offset after remote log deletion

2024-01-16 Thread Kamal Chandraprakash (Jira)
Kamal Chandraprakash created KAFKA-16146:


 Summary: Checkpoint log-start-offset after remote log deletion
 Key: KAFKA-16146
 URL: https://issues.apache.org/jira/browse/KAFKA-16146
 Project: Kafka
  Issue Type: Task
Reporter: Kamal Chandraprakash
Assignee: Kamal Chandraprakash


The log-start-offset checkpoint is not getting updated after deleting the 
remote log segments due to the below check:

https://sourcegraph.com/github.com/apache/kafka@b16df3b103d915d33670b8156217fc6c2b473f61/-/blob/core/src/main/scala/kafka/log/LogManager.scala?L851



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16088) Not reading active segments when RemoteFetch return Empty Records.

2024-01-07 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803941#comment-17803941
 ] 

Kamal Chandraprakash commented on KAFKA-16088:
--

[~goyarpit] 

Could you please write a unit/integration test to reproduce the issue? 

>  Not reading active segments  when RemoteFetch return Empty Records.
> 
>
> Key: KAFKA-16088
> URL: https://issues.apache.org/jira/browse/KAFKA-16088
> Project: Kafka
>  Issue Type: Bug
>Reporter: Arpit Goyal
>Priority: Critical
>
> Please refer this comment for details 
> https://github.com/apache/kafka/pull/15060#issuecomment-1879657273



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16073) Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed localLogStartOffset Update During Segment Deletion

2024-01-07 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803940#comment-17803940
 ] 

Kamal Chandraprakash commented on KAFKA-16073:
--

[~hzh0425@apache] 

The approach LGTM. Could you please open a PR? we can discuss the solution over 
there.

> Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed 
> localLogStartOffset Update During Segment Deletion
> 
>
> Key: KAFKA-16073
> URL: https://issues.apache.org/jira/browse/KAFKA-16073
> Project: Kafka
>  Issue Type: Bug
>  Components: core, Tiered-Storage
>Affects Versions: 3.6.1
>Reporter: hzh0425
>Assignee: hzh0425
>Priority: Major
>  Labels: KIP-405, kip-405, tiered-storage
> Fix For: 3.6.1, 3.8.0
>
>
> The identified bug in Apache Kafka's tiered storage feature involves a 
> delayed update of {{localLogStartOffset}} in the 
> {{UnifiedLog.deleteSegments}} method, impacting consumer fetch operations. 
> When segments are deleted from the log's memory state, the 
> {{localLogStartOffset}} isn't promptly updated. Concurrently, 
> {{ReplicaManager.handleOffsetOutOfRangeError}} checks if a consumer's fetch 
> offset is less than the {{{}localLogStartOffset{}}}. If it's greater, Kafka 
> erroneously sends an {{OffsetOutOfRangeException}} to the consumer.
> In a specific concurrent scenario, imagine sequential offsets: {{{}offset1 < 
> offset2 < offset3{}}}. A client requests data at {{{}offset2{}}}. While a 
> background deletion process removes segments from memory, it hasn't yet 
> updated the {{LocalLogStartOffset}} from {{offset1}} to {{{}offset3{}}}. 
> Consequently, when the fetch offset ({{{}offset2{}}}) is evaluated against 
> the stale {{offset1}} in {{{}ReplicaManager.handleOffsetOutOfRangeError{}}}, 
> it incorrectly triggers an {{{}OffsetOutOfRangeException{}}}. This issue 
> arises from the out-of-sync update of {{{}localLogStartOffset{}}}, leading to 
> incorrect handling of consumer fetch requests and potential data access 
> errors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15682) Ensure internal remote log metadata topic does not expire its segments before deleting user-topic segments

2024-01-07 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803939#comment-17803939
 ] 

Kamal Chandraprakash commented on KAFKA-15682:
--

The retention of `__remote_log_metadata` is configurable by the user.

> Ensure internal remote log metadata topic does not expire its segments before 
> deleting user-topic segments
> --
>
> Key: KAFKA-15682
> URL: https://issues.apache.org/jira/browse/KAFKA-15682
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Assignee: Phuc Hong Tran
>Priority: Major
>
> One of the implementation of RemoteLogMetadataManager is 
> TopicBasedRemoteLogMetadataManager which uses an internal Kafka topic 
> {{__remote_log_metadata}} to store the metadata about the remote log 
> segments. Unlike other internal topics which are compaction enabled, this 
> topic is not enabled with compaction and retention is set to unlimited. 
> Keeping this internal topic retention to unlimited is not practical in real 
> world use-case where the topic local disk usage footprint grow huge over a 
> period of time. 
> It is assumed that the user will set the retention to a reasonable time such 
> that it is the max of all the user-created topics (max + X). We can't just 
> rely on the assumption and need an assertion to ensure that the internal 
> {{__remote_log_metadata}} segments are not eligible for deletion before the 
> expiry of all the relevant user-topic uploaded remote-log-segments , 
> otherwise there will be dangling remote-log-segments which won't be cleared 
> once all the brokers are restarted post the internal topic retention cleanup.
> See the discussion thread: 
> https://github.com/apache/kafka/pull/14576#discussion_r1368576126



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15876) Introduce Remote Storage Not Ready Exception

2024-01-07 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-15876:
-
Fix Version/s: 3.8.0

> Introduce Remote Storage Not Ready Exception
> 
>
> Key: KAFKA-15876
> URL: https://issues.apache.org/jira/browse/KAFKA-15876
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Major
>  Labels: kip
> Fix For: 3.8.0
>
>
> When tiered storage is enabled on the cluster, Kafka broker has to build the 
> remote log metadata for all the partitions that it is either leader/follower 
> on node restart. The remote log metadata is built in asynchronous fashion and 
> does not interfere with the broker startup path. Once the broker becomes 
> online, it cannot handle the client requests (FETCH and LIST_OFFSETS) to 
> access remote storage until the metadata gets built for those partitions. 
> Currently, we are returning a ReplicaNotAvailable exception back to the 
> client so that it will retry after sometime.
> [ReplicaNotAvailableException|https://sourcegraph.com/github.com/apache/kafka@254335d24ab6b6d13142dcdb53fec3856c16de9e/-/blob/clients/src/main/java/org/apache/kafka/common/errors/ReplicaNotAvailableException.java]
>  is applicable when there is a reassignment is in-progress and kind of 
> deprecated with the NotLeaderOrFollowerException 
> ([PR#8979|https://github.com/apache/kafka/pull/8979]). It's good to introduce 
> an appropriate retriable exception for remote storage errors to denote that 
> it is not ready to accept the client requests yet.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16073) Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed localLogStartOffset Update During Segment Deletion

2024-01-05 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803486#comment-17803486
 ] 

Kamal Chandraprakash commented on KAFKA-16073:
--

[~hzh0425@apache] 

Are you working on this issue? Shall I take it over?

> Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed 
> localLogStartOffset Update During Segment Deletion
> 
>
> Key: KAFKA-16073
> URL: https://issues.apache.org/jira/browse/KAFKA-16073
> Project: Kafka
>  Issue Type: Bug
>  Components: core, Tiered-Storage
>Affects Versions: 3.6.1
>Reporter: hzh0425
>Assignee: hzh0425
>Priority: Major
>  Labels: KIP-405, kip-405, tiered-storage
> Fix For: 3.6.1, 3.8.0
>
>
> The identified bug in Apache Kafka's tiered storage feature involves a 
> delayed update of {{localLogStartOffset}} in the 
> {{UnifiedLog.deleteSegments}} method, impacting consumer fetch operations. 
> When segments are deleted from the log's memory state, the 
> {{localLogStartOffset}} isn't promptly updated. Concurrently, 
> {{ReplicaManager.handleOffsetOutOfRangeError}} checks if a consumer's fetch 
> offset is less than the {{{}localLogStartOffset{}}}. If it's greater, Kafka 
> erroneously sends an {{OffsetOutOfRangeException}} to the consumer.
> In a specific concurrent scenario, imagine sequential offsets: {{{}offset1 < 
> offset2 < offset3{}}}. A client requests data at {{{}offset2{}}}. While a 
> background deletion process removes segments from memory, it hasn't yet 
> updated the {{LocalLogStartOffset}} from {{offset1}} to {{{}offset3{}}}. 
> Consequently, when the fetch offset ({{{}offset2{}}}) is evaluated against 
> the stale {{offset1}} in {{{}ReplicaManager.handleOffsetOutOfRangeError{}}}, 
> it incorrectly triggers an {{{}OffsetOutOfRangeException{}}}. This issue 
> arises from the out-of-sync update of {{{}localLogStartOffset{}}}, leading to 
> incorrect handling of consumer fetch requests and potential data access 
> errors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15777) Configurable remote fetch bytes per partition from Consumer

2024-01-02 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17801817#comment-17801817
 ] 

Kamal Chandraprakash commented on KAFKA-15777:
--

[~isding_l] 

This task require a KIP as we may have to add a new config to the 
consumer/broker.

> Configurable remote fetch bytes per partition from Consumer
> ---
>
> Key: KAFKA-15777
> URL: https://issues.apache.org/jira/browse/KAFKA-15777
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Major
>
> A consumer can configure the amount of local bytes to read from each 
> partition in the FETCH request.
> {{max.fetch.bytes}} = 50 MB
> {{max.partition.fetch.bytes}} = 1 MB
> Similar to this, the consumer should be able to configure 
> {{max.remote.partition.fetch.bytes}} = 4 MB.
> While handling the {{FETCH}} request, if we encounter a partition to read 
> data from remote storage, then rest of the partitions in the request are 
> ignored. Essentially, we are serving only 1 MB of remote data per FETCH 
> request when all the partitions in the request are to be served from the 
> remote storage.
> Providing one more configuration to the client help the user to tune the 
> values depending on their storage plugin. The user might want to optimise the 
> number of calls to remote storage vs amount of bytes returned back to the 
> client in the FETCH response.
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L1454]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15777) Configurable remote fetch bytes per partition from Consumer

2024-01-02 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-15777:
-
Labels: kip  (was: )

> Configurable remote fetch bytes per partition from Consumer
> ---
>
> Key: KAFKA-15777
> URL: https://issues.apache.org/jira/browse/KAFKA-15777
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Major
>  Labels: kip
>
> A consumer can configure the amount of local bytes to read from each 
> partition in the FETCH request.
> {{max.fetch.bytes}} = 50 MB
> {{max.partition.fetch.bytes}} = 1 MB
> Similar to this, the consumer should be able to configure 
> {{max.remote.partition.fetch.bytes}} = 4 MB.
> While handling the {{FETCH}} request, if we encounter a partition to read 
> data from remote storage, then rest of the partitions in the request are 
> ignored. Essentially, we are serving only 1 MB of remote data per FETCH 
> request when all the partitions in the request are to be served from the 
> remote storage.
> Providing one more configuration to the client help the user to tune the 
> values depending on their storage plugin. The user might want to optimise the 
> number of calls to remote storage vs amount of bytes returned back to the 
> client in the FETCH response.
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L1454]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15166) Add deletePartition API to the RemoteStorageManager

2023-12-03 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-15166:
-
Labels: kip  (was: )

> Add deletePartition API to the RemoteStorageManager
> ---
>
> Key: KAFKA-15166
> URL: https://issues.apache.org/jira/browse/KAFKA-15166
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Major
>  Labels: kip
> Fix For: 3.7.0
>
>
> Remote Storage Manager exposes {{deleteLogSegmentData}} API to delete the 
> individual log segments. Storage providers such as HDFS have support to 
> delete a directory. Having an {{deletePartition}} API to delete the data at 
> the partition level will enhance the topic deletion.
> This task may require a KIP as it touches the user-facing APIs.
>  
> Please also remember to remove the comment on the test here: 
> https://github.com/apache/kafka/pull/13837#discussion_r1247676834



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15931) Cached transaction index gets closed if tiered storage read is interrupted

2023-11-29 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17791140#comment-17791140
 ] 

Kamal Chandraprakash commented on KAFKA-15931:
--

[~ivanyu] 

Thanks for filing this issue! While working on KAFKA-15047, we noticed that 
some of the 
[TransactionsWithTieredStoreTest|https://ge.apache.org/s/ofqqovlfxqpwa/tests/task/:storage:test/details/org.apache.kafka.tiered.storage.integration.TransactionsWithTieredStoreTest]
 failed with this error.

> Cached transaction index gets closed if tiered storage read is interrupted
> --
>
> Key: KAFKA-15931
> URL: https://issues.apache.org/jira/browse/KAFKA-15931
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.6.0
>Reporter: Ivan Yurchenko
>Priority: Minor
>
> This reproduces when reading from remote storage with the default 
> {{fetch.max.wait.ms}} (500) or lower. {{isolation.level=read_committed}} is 
> needed to trigger this.
> It's not easy to reproduce on local-only setups, unfortunately, because reads 
> are fast and aren't interrupted.
> This error is logged
> {noformat}
> [2023-11-29 14:01:01,166] ERROR Error occurred while reading the remote data 
> for topic1-0 (kafka.log.remote.RemoteLogReader)
> org.apache.kafka.common.KafkaException: Failed read position from the 
> transaction index 
>     at 
> org.apache.kafka.storage.internals.log.TransactionIndex$1.hasNext(TransactionIndex.java:235)
>     at 
> org.apache.kafka.storage.internals.log.TransactionIndex.collectAbortedTxns(TransactionIndex.java:171)
>     at 
> kafka.log.remote.RemoteLogManager.collectAbortedTransactions(RemoteLogManager.java:1359)
>     at 
> kafka.log.remote.RemoteLogManager.addAbortedTransactions(RemoteLogManager.java:1341)
>     at kafka.log.remote.RemoteLogManager.read(RemoteLogManager.java:1310)
>     at kafka.log.remote.RemoteLogReader.call(RemoteLogReader.java:62)
>     at kafka.log.remote.RemoteLogReader.call(RemoteLogReader.java:31)
>     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>     at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: java.nio.channels.ClosedChannelException
>     at 
> java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
>     at java.base/sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:325)
>     at 
> org.apache.kafka.storage.internals.log.TransactionIndex$1.hasNext(TransactionIndex.java:233)
>     ... 10 more
> {noformat}
> and after that this txn index becomes unusable until the process is restarted.
> I suspect, it's caused by the reading thread being interrupted due to the 
> fetch timeout. At least [this 
> code|https://github.com/AdoptOpenJDK/openjdk-jdk11/blob/19fb8f93c59dfd791f62d41f332db9e306bc1422/src/java.base/share/classes/java/nio/channels/spi/AbstractInterruptibleChannel.java#L159-L160]
>  in {{AbstractInterruptibleChannel}} is called.
> Fixing may be easy: reopen the channel in {{TransactionIndex}} if it's close. 
> However, off the top of my head I can't say if there are some less obvious 
> implications of this change.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15876) Introduce Remote Storage Not Ready Exception

2023-11-22 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-15876:
-
Labels: kip  (was: )

> Introduce Remote Storage Not Ready Exception
> 
>
> Key: KAFKA-15876
> URL: https://issues.apache.org/jira/browse/KAFKA-15876
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Major
>  Labels: kip
>
> When tiered storage is enabled on the cluster, Kafka broker has to build the 
> remote log metadata for all the partitions that it is either leader/follower 
> on node restart. The remote log metadata is built in asynchronous fashion and 
> does not interfere with the broker startup path. Once the broker becomes 
> online, it cannot handle the client requests (FETCH and LIST_OFFSETS) to 
> access remote storage until the metadata gets built for those partitions. 
> Currently, we are returning a ReplicaNotAvailable exception back to the 
> client so that it will retry after sometime.
> [ReplicaNotAvailableException|https://sourcegraph.com/github.com/apache/kafka@254335d24ab6b6d13142dcdb53fec3856c16de9e/-/blob/clients/src/main/java/org/apache/kafka/common/errors/ReplicaNotAvailableException.java]
>  is applicable when there is a reassignment is in-progress and kind of 
> deprecated with the NotLeaderOrFollowerException 
> ([PR#8979|https://github.com/apache/kafka/pull/8979]). It's good to introduce 
> an appropriate retriable exception for remote storage errors to denote that 
> it is not ready to accept the client requests yet.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15876) Introduce Remote Storage Not Ready Exception

2023-11-22 Thread Kamal Chandraprakash (Jira)
Kamal Chandraprakash created KAFKA-15876:


 Summary: Introduce Remote Storage Not Ready Exception
 Key: KAFKA-15876
 URL: https://issues.apache.org/jira/browse/KAFKA-15876
 Project: Kafka
  Issue Type: Task
Reporter: Kamal Chandraprakash
Assignee: Kamal Chandraprakash


When tiered storage is enabled on the cluster, Kafka broker has to build the 
remote log metadata for all the partitions that it is either leader/follower on 
node restart. The remote log metadata is built in asynchronous fashion and does 
not interfere with the broker startup path. Once the broker becomes online, it 
cannot handle the client requests (FETCH and LIST_OFFSETS) to access remote 
storage until the metadata gets built for those partitions. Currently, we are 
returning a ReplicaNotAvailable exception back to the client so that it will 
retry after sometime.

[ReplicaNotAvailableException|https://sourcegraph.com/github.com/apache/kafka@254335d24ab6b6d13142dcdb53fec3856c16de9e/-/blob/clients/src/main/java/org/apache/kafka/common/errors/ReplicaNotAvailableException.java]
 is applicable when there is a reassignment is in-progress and kind of 
deprecated with the NotLeaderOrFollowerException 
([PR#8979|https://github.com/apache/kafka/pull/8979]). It's good to introduce 
an appropriate retriable exception for remote storage errors to denote that it 
is not ready to accept the client requests yet.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15859) Introduce delayed remote list offsets to make LIST_OFFSETS async

2023-11-20 Thread Kamal Chandraprakash (Jira)
Kamal Chandraprakash created KAFKA-15859:


 Summary: Introduce delayed remote list offsets to make 
LIST_OFFSETS async
 Key: KAFKA-15859
 URL: https://issues.apache.org/jira/browse/KAFKA-15859
 Project: Kafka
  Issue Type: Task
Reporter: Kamal Chandraprakash
Assignee: Kamal Chandraprakash


LIST_OFFSETS API request is handled by the request handler threads. If there 
are concurrent LIST_OFFSETS requests to remote storage more than the number of 
request handler threads, then other requests such as FETCH and PRODUCE might 
starve and be queued. This can lead to higher latency in producing/consuming 
messages.

The `offsetForTimes` call to remote storage can take time as it has to fetch 
the offset and time indexes to serve the request so moving the requests to 
purgatory and handle it via the remote-log-reader threads frees up the request 
handler threads to serve other requests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14877) refactor InMemoryLeaderEpochCheckpoint

2023-11-17 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash reassigned KAFKA-14877:


Assignee: (was: Kamal Chandraprakash)

> refactor InMemoryLeaderEpochCheckpoint
> --
>
> Key: KAFKA-14877
> URL: https://issues.apache.org/jira/browse/KAFKA-14877
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Luke Chen
>Priority: Minor
> Fix For: 3.7.0
>
>
> follow up with this comment: 
> https://github.com/apache/kafka/pull/13456#discussion_r1154306477



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-14877) refactor InMemoryLeaderEpochCheckpoint

2023-11-16 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17786768#comment-17786768
 ] 

Kamal Chandraprakash edited comment on KAFKA-14877 at 11/16/23 2:20 PM:


[~showuon] [~divijvaidya] 

The task expectation is not clear from the discussion. Do we want to avoid the 
creation of intermediary LeaderEpochFileCache instance? (by adding couple of 
methods: `truncateFromStart` and `truncateFromEnd` on the 
InMemoryLeaderEpochCheckpoint class)

Since the InMemoryLeaderEpochCheckpoint stores the data in memory, there is 
almost negligible cost in doing the `flush`.


was (Author: ckamal):
[~showuon] [~divijvaidya] 

The task expectation is not clear from the discussion. Do we want to avoid the 
creation of intermediary LeaderEpochFileCache instance? (by adding couple of 
methods: `truncateFromStart` and `truncateFromEnd on the 
InMemoryLeaderEpochCheckpoint class)

Since the InMemoryLeaderEpochCheckpoint stores the data in memory, there is 
almost negligible cost in doing the `flush`.

> refactor InMemoryLeaderEpochCheckpoint
> --
>
> Key: KAFKA-14877
> URL: https://issues.apache.org/jira/browse/KAFKA-14877
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Luke Chen
>Assignee: Kamal Chandraprakash
>Priority: Minor
> Fix For: 3.7.0
>
>
> follow up with this comment: 
> https://github.com/apache/kafka/pull/13456#discussion_r1154306477



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-14877) refactor InMemoryLeaderEpochCheckpoint

2023-11-16 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17786768#comment-17786768
 ] 

Kamal Chandraprakash edited comment on KAFKA-14877 at 11/16/23 2:20 PM:


[~showuon] [~divijvaidya] 

The task expectation is not clear from the discussion. Do we want to avoid the 
creation of intermediary LeaderEpochFileCache instance? (by adding couple of 
methods: `truncateFromStart` and `truncateFromEnd on the 
InMemoryLeaderEpochCheckpoint class)

Since the InMemoryLeaderEpochCheckpoint stores the data in memory, there is 
almost negligible cost in doing the `flush`.


was (Author: ckamal):
[~showuon] [~divijvaidya] 

The expectation from this task is not clear from the discussion. Do we want to 
avoid the creation of intermediary LeaderEpochFileCache instance? (by adding 
couple of methods: `truncateFromStart` and `truncateFromEnd on the 
InMemoryLeaderEpochCheckpoint class)

Since the InMemoryLeaderEpochCheckpoint stores the data in memory, there is 
almost negligible cost in doing the `flush`.

> refactor InMemoryLeaderEpochCheckpoint
> --
>
> Key: KAFKA-14877
> URL: https://issues.apache.org/jira/browse/KAFKA-14877
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Luke Chen
>Assignee: Kamal Chandraprakash
>Priority: Minor
> Fix For: 3.7.0
>
>
> follow up with this comment: 
> https://github.com/apache/kafka/pull/13456#discussion_r1154306477



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14877) refactor InMemoryLeaderEpochCheckpoint

2023-11-16 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17786768#comment-17786768
 ] 

Kamal Chandraprakash commented on KAFKA-14877:
--

[~showuon] [~divijvaidya] 

The expectation from this task is not clear from the discussion. Do we want to 
avoid the creation of intermediary LeaderEpochFileCache instance? (by adding 
couple of methods: `truncateFromStart` and `truncateFromEnd on the 
InMemoryLeaderEpochCheckpoint class)

Since the InMemoryLeaderEpochCheckpoint stores the data in memory, there is 
almost negligible cost in doing the `flush`.

> refactor InMemoryLeaderEpochCheckpoint
> --
>
> Key: KAFKA-14877
> URL: https://issues.apache.org/jira/browse/KAFKA-14877
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Luke Chen
>Assignee: Kamal Chandraprakash
>Priority: Minor
> Fix For: 3.7.0
>
>
> follow up with this comment: 
> https://github.com/apache/kafka/pull/13456#discussion_r1154306477



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14877) refactor InMemoryLeaderEpochCheckpoint

2023-11-16 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash reassigned KAFKA-14877:


Assignee: Kamal Chandraprakash

> refactor InMemoryLeaderEpochCheckpoint
> --
>
> Key: KAFKA-14877
> URL: https://issues.apache.org/jira/browse/KAFKA-14877
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Luke Chen
>Assignee: Kamal Chandraprakash
>Priority: Minor
> Fix For: 3.7.0
>
>
> follow up with this comment: 
> https://github.com/apache/kafka/pull/13456#discussion_r1154306477



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15776) Update delay timeout for DelayedRemoteFetch request

2023-11-16 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-15776:
-
Summary: Update delay timeout for DelayedRemoteFetch request  (was: 
Configurable delay timeout for DelayedRemoteFetch request)

> Update delay timeout for DelayedRemoteFetch request
> ---
>
> Key: KAFKA-15776
> URL: https://issues.apache.org/jira/browse/KAFKA-15776
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Major
>
> We are reusing the {{fetch.max.wait.ms}} config as a delay timeout for 
> DelayedRemoteFetchPurgatory. {{fetch.max.wait.ms}} purpose is to wait for the 
> given amount of time when there is no data available to serve the FETCH 
> request.
> {code:java}
> The maximum amount of time the server will block before answering the fetch 
> request if there isn't sufficient data to immediately satisfy the requirement 
> given by fetch.min.bytes.
> {code}
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala#L41]
> Using the same timeout in the DelayedRemoteFetchPurgatory can confuse the 
> user on how to configure optimal value for each purpose. Moreover, the config 
> is of *LOW* importance and most of the users won't configure it and use the 
> default value of 500 ms.
> Having the delay timeout of 500 ms in DelayedRemoteFetchPurgatory can lead to 
> higher number of expired delayed remote fetch requests when the remote 
> storage have any degradation.
> We should introduce one {{fetch.remote.max.wait.ms}} config (preferably 
> server config) to define the delay timeout for DelayedRemoteFetch requests 
> (or) take it from client similar to {{request.timeout.ms}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15376) Explore options of removing data earlier to the current leader's leader epoch lineage for topics enabled with tiered storage.

2023-11-14 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17785956#comment-17785956
 ] 

Kamal Chandraprakash edited comment on KAFKA-15376 at 11/14/23 4:29 PM:


[~divijvaidya] 

The [example|https://github.com/apache/kafka/pull/13561#discussion_r1293286722] 
provided in the discussion is misleading. Let's divide the example into two to 
navigate it easier:

Assume that there are two replicas Broker A and Broker B for partition tp0:

*Case-1*
Both the replicas A and B are insync on startup and they hold the leader-epoch 
0. Then, the brokers started to go down in ping-pong fashion. Each broker will 
hold the following epoch in it's leader-epoch-checkpoint file:

A: 0, 2, 4, 6, 8
B: 0, 1, 3, 5, 7

Since this is unclean-leader-election, the logs of Broker A and B might be 
diverged. As long as anyone of them is online, they continue to serve all the 
records according to the leader-epoch-checkpoint file. Once both the brokers 
becomes online, the follower truncates itself up-to the largest common log 
prefix offset so that the logs won't be diverged between the leader and 
follower. In this case, we continue to serve the data from the remote storage 
as no segments will be removed due to leader-epoch-cache truncation since both 
of them holds the LE0.

Note that the approach taken here is similar to local-log where the broker will 
serve the log that they have until they sync with each other.

*Case-2*
Both the replicas A and B are out-of-sync on startup and the follower doesn't 
hold leader-epoch 0. Assume that Broker A is the leader and B is the follower & 
doesn't hold any data about the partition (empty-disk). When the Broker A goes 
down, there will be offline partition and B will be elected as unclean leader, 
the log-end-offset of the partition will be reset back to 0.

>From the example provided in the discussion:

At T1, Broker A
{code:java}
-
leader-epoch | start-offset |
-
 0  0
 1  180
 2  400
- {code}
At T2, Broker B, the start-offset will be reset back to 0: (Note that the 
leader does not interact with remote storage to find the next offset trade-off 
b/w availability and durabilty)
{code:java}
-
leader-epoch | start-offset |
-
 3  0
 4  780
 6  900
 7  990 
- {code}
Now, if we hold the data for both the lineage and ping-pong the brokers, we 
will be serving the diverged data back to the client for the same fetch-offset 
depends on the broker which is online. Once, the replicas start to interact 
with each other, they truncate the remote data themselves based on the current 
leader epoch lineage.

The example provided in the discussion is applicable only for case-2 where the 
replicas never interacted among themselves at-least once. 


was (Author: ckamal):
[~divijvaidya] 

The [example|https://github.com/apache/kafka/pull/13561#discussion_r1293286722] 
provided in the discussion is misleading. Let's divide the example into two to 
navigate it easier:


Assume that there are two replicas Broker A and Broker B for partition tp0:

*Case-1*
Both the replicas A and B are insync on startup and they hold the leader-epoch 
0. Then, the brokers started to go down in ping-pong fashion. Each broker will 
hold the following epoch in it's leader-epoch-checkpoint file:

A: 0, 2, 4, 6, 8
B: 0, 1, 3, 5, 7

Since this is unclean-leader-election, the logs of Broker A and B might be 
diverged. As long as anyone of them is online, they continue to serve all the 
records according to the leader-epoch-checkpoint file. Once both the brokers 
becomes online, the follower truncates itself up-to the largest common log 
prefix offset so that the logs won't be diverged between the leader and 
follower. In this case, we continue to serve the data from the remote storage 
as no segments will be removed due to leader-epoch-cache truncation since both 
of them holds the LE0. 

Note that the approach taken here is similar to local-log where the broker will 
serve the log that they have until they sync with each other.

*Case-2*
Both the replicas A and B are out-of-sync on startup and the follower doesn't 
hold leader-epoch 0. Assume that Broker A is the leader and B is the follower & 
doesn't hold any data about the partition (empty-disk). When the Broker A goes 
down, there will be offline partition and B will be elected as unclean leader, 
the log-end-offset of the partition will be reset back to 0.

>From the example provided in the discussion:

At T1, Broker A

{code:java}
-
leader-epoch | start-offset |
-

[jira] [Commented] (KAFKA-15376) Explore options of removing data earlier to the current leader's leader epoch lineage for topics enabled with tiered storage.

2023-11-14 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17785962#comment-17785962
 ] 

Kamal Chandraprakash commented on KAFKA-15376:
--

With unclean-leader-election enabled, there can be log-divergence, log-loss, 
and exactly-once-delivery is not applicable. We are trying to extend the same 
contract that is for local storage to remote when this feature is enabled. 
There are pros and cons to this feature:

*Pros*

1. The replica will serve the data that it seen so far back to the client even 
if it never interact with any other replica.

*Cons*

1. RemoteStorageManager / RemoteLogManager will have additional work to 
maintain the unreferenced segments and cleaning up them.

> Explore options of removing data earlier to the current leader's leader epoch 
> lineage for topics enabled with tiered storage.
> -
>
> Key: KAFKA-15376
> URL: https://issues.apache.org/jira/browse/KAFKA-15376
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Reporter: Satish Duggana
>Priority: Major
> Fix For: 3.7.0
>
>
> Followup on the discussion thread:
> [https://github.com/apache/kafka/pull/13561#discussion_r1288778006]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15376) Explore options of removing data earlier to the current leader's leader epoch lineage for topics enabled with tiered storage.

2023-11-14 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17785956#comment-17785956
 ] 

Kamal Chandraprakash commented on KAFKA-15376:
--

[~divijvaidya] 

The [example|https://github.com/apache/kafka/pull/13561#discussion_r1293286722] 
provided in the discussion is misleading. Let's divide the example into two to 
navigate it easier:


Assume that there are two replicas Broker A and Broker B for partition tp0:

*Case-1*
Both the replicas A and B are insync on startup and they hold the leader-epoch 
0. Then, the brokers started to go down in ping-pong fashion. Each broker will 
hold the following epoch in it's leader-epoch-checkpoint file:

A: 0, 2, 4, 6, 8
B: 0, 1, 3, 5, 7

Since this is unclean-leader-election, the logs of Broker A and B might be 
diverged. As long as anyone of them is online, they continue to serve all the 
records according to the leader-epoch-checkpoint file. Once both the brokers 
becomes online, the follower truncates itself up-to the largest common log 
prefix offset so that the logs won't be diverged between the leader and 
follower. In this case, we continue to serve the data from the remote storage 
as no segments will be removed due to leader-epoch-cache truncation since both 
of them holds the LE0. 

Note that the approach taken here is similar to local-log where the broker will 
serve the log that they have until they sync with each other.

*Case-2*
Both the replicas A and B are out-of-sync on startup and the follower doesn't 
hold leader-epoch 0. Assume that Broker A is the leader and B is the follower & 
doesn't hold any data about the partition (empty-disk). When the Broker A goes 
down, there will be offline partition and B will be elected as unclean leader, 
the log-end-offset of the partition will be reset back to 0.

>From the example provided in the discussion:

At T1, Broker A

{code:java}
-
leader-epoch | start-offset |
-
 0  0
 1  180
 2  400
- {code}

At T2, Broker B, the start-offset will be reset back to 0: (Note that the 
leader does not interact with remote storage to find the next offset trade-off 
b/w availability and durabilty)
{code:java}
-
leader-epoch | start-offset |
-
 3  0
 4  780
 6  900
 7  990 
- {code}

Now, if we hold the data for both the lineage and ping-pong the brokers, we 
will be serving the diverged data back to the client for the same fetch-offset 
depends on the broker which is online. Once, the replicas start to interact 
with each other, they truncate the remote data themselves based on the current 
leader epoch lineage.

The example provided in the discussion is applicable only when the replicas 
never interacted among themselves at-least once. 

> Explore options of removing data earlier to the current leader's leader epoch 
> lineage for topics enabled with tiered storage.
> -
>
> Key: KAFKA-15376
> URL: https://issues.apache.org/jira/browse/KAFKA-15376
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Reporter: Satish Duggana
>Priority: Major
> Fix For: 3.7.0
>
>
> Followup on the discussion thread:
> [https://github.com/apache/kafka/pull/13561#discussion_r1288778006]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15376) Explore options of removing data earlier to the current leader's leader epoch lineage for topics enabled with tiered storage.

2023-11-14 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash reassigned KAFKA-15376:


Assignee: Satish Duggana  (was: Kamal Chandraprakash)

> Explore options of removing data earlier to the current leader's leader epoch 
> lineage for topics enabled with tiered storage.
> -
>
> Key: KAFKA-15376
> URL: https://issues.apache.org/jira/browse/KAFKA-15376
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Reporter: Satish Duggana
>Assignee: Satish Duggana
>Priority: Major
> Fix For: 3.7.0
>
>
> Followup on the discussion thread:
> [https://github.com/apache/kafka/pull/13561#discussion_r1288778006]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15376) Explore options of removing data earlier to the current leader's leader epoch lineage for topics enabled with tiered storage.

2023-11-14 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash resolved KAFKA-15376.
--
Resolution: Fixed

This task was already addressed in the code, so closing the ticket:

https://sourcegraph.com/github.com/apache/kafka@3.6/-/blob/core/src/main/java/kafka/log/remote/RemoteLogManager.java?L1043-1061

> Explore options of removing data earlier to the current leader's leader epoch 
> lineage for topics enabled with tiered storage.
> -
>
> Key: KAFKA-15376
> URL: https://issues.apache.org/jira/browse/KAFKA-15376
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Reporter: Satish Duggana
>Assignee: Kamal Chandraprakash
>Priority: Major
> Fix For: 3.7.0
>
>
> Followup on the discussion thread:
> [https://github.com/apache/kafka/pull/13561#discussion_r1288778006]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15376) Explore options of removing data earlier to the current leader's leader epoch lineage for topics enabled with tiered storage.

2023-11-14 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash reassigned KAFKA-15376:


Assignee: Kamal Chandraprakash

> Explore options of removing data earlier to the current leader's leader epoch 
> lineage for topics enabled with tiered storage.
> -
>
> Key: KAFKA-15376
> URL: https://issues.apache.org/jira/browse/KAFKA-15376
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Reporter: Satish Duggana
>Assignee: Kamal Chandraprakash
>Priority: Major
> Fix For: 3.7.0
>
>
> Followup on the discussion thread:
> [https://github.com/apache/kafka/pull/13561#discussion_r1288778006]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-13563) FindCoordinatorFuture never get cleared in non-group mode( consumer#assign)

2023-11-09 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13563?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-13563:
-
Affects Version/s: 2.8.2

> FindCoordinatorFuture never get cleared in non-group mode( consumer#assign)
> ---
>
> Key: KAFKA-13563
> URL: https://issues.apache.org/jira/browse/KAFKA-13563
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.7.1, 2.6.2, 3.0.0, 2.8.2
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>  Labels: new-consumer-threading-should-fix
> Fix For: 3.2.0, 3.1.1
>
> Attachments: kafka.zip
>
>
> In KAFKA-10793, we fix the race condition when lookup coordinator by clearing 
> the _findCoordinatorFuture_ when handling the result, rather than in the 
> listener callbacks. It works well under consumer group mode (i.e. 
> Consumer#subscribe), but we found when user is using non consumer group mode 
> (i.e. Consumer#assign) with group id provided (for offset commitment, so that 
> there will be consumerCoordinator created), the _findCoordinatorFuture_ will 
> never be cleared in some situations, and cause the offset committing keeps 
> getting NOT_COORDINATOR error.
>  
> After KAFKA-10793, we clear the _findCoordinatorFuture_ in 2 places:
>  # heartbeat thread
>  # AbstractCoordinator#ensureCoordinatorReady
> But in non consumer group mode with group id provided, there will be no 
> (1)heartbeat thread , and it only call 
> (2)AbstractCoordinator#ensureCoordinatorReady when 1st time consumer wants to 
> fetch committed offset position. That is, after 2nd lookupCoordinator call, 
> we have no chance to clear the _findCoordinatorFuture_ .
>  
> To avoid the race condition as KAFKA-10793 mentioned, it's not safe to clear 
> the _findCoordinatorFuture_ in the future listener. So, I think we can fix 
> this issue by calling AbstractCoordinator#ensureCoordinatorReady when 
> coordinator unknown in non consumer group case, under each Consumer#poll.
>  
> Reproduce steps:
>  
> 1. Start a 3 Broker cluster with a Topic having Replicas=3.
> 2. Start a Client with Producer and Consumer (with Consumer#assign(), not 
> subscribe, and provide a group id) communicating over the Topic.
> 3. Stop the Broker that is acting as the Group Coordinator.
> 4. Observe successful Rediscovery of new Group Coordinator.
> 5. Restart the stopped Broker.
> 6. Stop the Broker that became the new Group Coordinator at step 4.
> 7. Observe "Rediscovery will be attempted" message but no "Discovered group 
> coordinator" message.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15776) Configurable delay timeout for DelayedRemoteFetch request

2023-11-03 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-15776:
-
Description: 
We are reusing the {{fetch.max.wait.ms}} config as a delay timeout for 
DelayedRemoteFetchPurgatory. {{fetch.max.wait.ms}} purpose is to wait for the 
given amount of time when there is no data available to serve the FETCH request.
{code:java}
The maximum amount of time the server will block before answering the fetch 
request if there isn't sufficient data to immediately satisfy the requirement 
given by fetch.min.bytes.
{code}
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala#L41]

Using the same timeout in the DelayedRemoteFetchPurgatory can confuse the user 
on how to configure optimal value for each purpose. Moreover, the config is of 
*LOW* importance and most of the users won't configure it and use the default 
value of 500 ms.

Having the delay timeout of 500 ms in DelayedRemoteFetchPurgatory can lead to 
higher number of expired delayed remote fetch requests when the remote storage 
have any degradation.

We should introduce one {{fetch.remote.max.wait.ms}} config (preferably server 
config) to define the delay timeout for DelayedRemoteFetch requests (or) take 
it from client similar to {{request.timeout.ms}}.

  was:
We are reusing the {{fetch.max.wait.ms}} config as a delay timeout for 
DelayedRemoteFetchPurgatory. {{fetch.max.wait.ms}} purpose is to wait for the 
given amount of time when there is no data available to serve the FETCH request.
{code:java}
The maximum amount of time the server will block before answering the fetch 
request if there isn't sufficient data to immediately satisfy the requirement 
given by fetch.min.bytes.
{code}
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala#L41]

Using the same timeout in the DelayedRemoteFetchPurgatory can confuse the user 
on how to configure optimal value for each purpose. Moreover, the config is of 
*LOW* importance and most of the users won't configure it and use the default 
value of 500 ms.

Having the delay timeout of 500 ms in DelayedRemoteFetchPurgatory can lead to 
higher number of expired delayed remote fetch requests when the remote storage 
have any degradation.

We should introduce one config (preferably server config) to define the delay 
timeout for DelayedRemoteFetch requests (or) take it from client similar to 
{{request.timeout.ms}}.


> Configurable delay timeout for DelayedRemoteFetch request
> -
>
> Key: KAFKA-15776
> URL: https://issues.apache.org/jira/browse/KAFKA-15776
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Major
>
> We are reusing the {{fetch.max.wait.ms}} config as a delay timeout for 
> DelayedRemoteFetchPurgatory. {{fetch.max.wait.ms}} purpose is to wait for the 
> given amount of time when there is no data available to serve the FETCH 
> request.
> {code:java}
> The maximum amount of time the server will block before answering the fetch 
> request if there isn't sufficient data to immediately satisfy the requirement 
> given by fetch.min.bytes.
> {code}
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala#L41]
> Using the same timeout in the DelayedRemoteFetchPurgatory can confuse the 
> user on how to configure optimal value for each purpose. Moreover, the config 
> is of *LOW* importance and most of the users won't configure it and use the 
> default value of 500 ms.
> Having the delay timeout of 500 ms in DelayedRemoteFetchPurgatory can lead to 
> higher number of expired delayed remote fetch requests when the remote 
> storage have any degradation.
> We should introduce one {{fetch.remote.max.wait.ms}} config (preferably 
> server config) to define the delay timeout for DelayedRemoteFetch requests 
> (or) take it from client similar to {{request.timeout.ms}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15776) Configurable delay timeout for DelayedRemoteFetch request

2023-11-02 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-15776:
-
Description: 
We are reusing the {{fetch.max.wait.ms}} config as a delay timeout for 
DelayedRemoteFetchPurgatory. {{fetch.max.wait.ms}} purpose is to wait for the 
given amount of time when there is no data available to serve the FETCH request.
{code:java}
The maximum amount of time the server will block before answering the fetch 
request if there isn't sufficient data to immediately satisfy the requirement 
given by fetch.min.bytes.
{code}
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala#L41]

Using the same timeout in the DelayedRemoteFetchPurgatory can confuse the user 
on how to configure optimal value for each purpose. Moreover, the config is of 
*LOW* importance and most of the users won't configure it and use the default 
value of 500 ms.

Having the delay timeout of 500 ms in DelayedRemoteFetchPurgatory can lead to 
higher number of expired delayed remote fetch requests when the remote storage 
have any degradation.

We should introduce one config (preferably server config) to define the delay 
timeout for DelayedRemoteFetch requests (or) take it from client similar to 
{{request.timeout.ms}}.

  was:
We are reusing the {{fetch.max.wait.ms}} config as a delay timeout for 
DelayedRemoteFetchPurgatory. {{fetch.max.wait.ms}} purpose is to wait for the 
given amount of time when there is no data available to serve the FETCH request.
{code:java}
The maximum amount of time the server will block before answering the fetch 
request if there isn't sufficient data to immediately satisfy the requirement 
given by fetch.min.bytes.
{code}
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala#L41]

Using the same timeout in the DelayedRemoteFetchPurgatory can confuse the user 
on how to configure optimal value for each purpose. Moreover, the config is of 
*LOW* importance and most of the users won't configure it and use the default 
value of 500 ms.

Having the delay timeout of 500 ms in DelayedRemoteFetchPurgatory can lead to 
higher number of expired delayed remote fetch request when the remote storage 
have degrades to serve within the timeout.

We should introduce one config (preferably server config) to define the delay 
timeout for DelayedRemoteFetch requests (or) take it from client similar to 
{{request.timeout.ms}}.


> Configurable delay timeout for DelayedRemoteFetch request
> -
>
> Key: KAFKA-15776
> URL: https://issues.apache.org/jira/browse/KAFKA-15776
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Major
>
> We are reusing the {{fetch.max.wait.ms}} config as a delay timeout for 
> DelayedRemoteFetchPurgatory. {{fetch.max.wait.ms}} purpose is to wait for the 
> given amount of time when there is no data available to serve the FETCH 
> request.
> {code:java}
> The maximum amount of time the server will block before answering the fetch 
> request if there isn't sufficient data to immediately satisfy the requirement 
> given by fetch.min.bytes.
> {code}
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala#L41]
> Using the same timeout in the DelayedRemoteFetchPurgatory can confuse the 
> user on how to configure optimal value for each purpose. Moreover, the config 
> is of *LOW* importance and most of the users won't configure it and use the 
> default value of 500 ms.
> Having the delay timeout of 500 ms in DelayedRemoteFetchPurgatory can lead to 
> higher number of expired delayed remote fetch requests when the remote 
> storage have any degradation.
> We should introduce one config (preferably server config) to define the delay 
> timeout for DelayedRemoteFetch requests (or) take it from client similar to 
> {{request.timeout.ms}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15777) Configurable remote fetch bytes per partition from Consumer

2023-11-02 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-15777:
-
Description: 
A consumer can configure the amount of local bytes to read from each partition 
in the FETCH request.

{{max.fetch.bytes}} = 50 MB
{{max.partition.fetch.bytes}} = 1 MB

Similar to this, the consumer should be able to configure 
{{max.remote.partition.fetch.bytes}} = 4 MB.

While handling the {{FETCH}} request, if we encounter a partition to read data 
from remote storage, then rest of the partitions in the request are ignored. 
Essentially, we are serving only 1 MB of remote data per FETCH request when all 
the partitions in the request are to be served from the remote storage.

Providing one more configuration to the client help the user to tune the values 
depending on their storage plugin. The user might want to optimise the number 
of calls to remote storage vs amount of bytes returned back to the client in 
the FETCH response.

[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L1454]

  was:
A consumer can configure the amount of local bytes to read from each partition 
in the FETCH request.

{{max.fetch.bytes}} = 50 MB
{{max.partition.fetch.bytes}} = 1 MB

Similar to this, the consumer should be able to configure 
{{max.remote.partition.fetch.bytes}} = 4 MB.

While handling the {{FETCH}} request, if we encounter a partition to read data 
from remote storage, then rest of the partitions in the request are ignored. 
Essentially, we are serving only 1 MB of remote data per FETCH request when all 
the partitions in the request are to be served from the remote storage.

Providing one more configuration to the client help the user to tune the values 
depends on their storage plugin. The user might want to optimise the number of 
calls to remote storage vs amount of bytes returned back to the client in the 
FETCH response.

[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L1454]


> Configurable remote fetch bytes per partition from Consumer
> ---
>
> Key: KAFKA-15777
> URL: https://issues.apache.org/jira/browse/KAFKA-15777
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Major
>
> A consumer can configure the amount of local bytes to read from each 
> partition in the FETCH request.
> {{max.fetch.bytes}} = 50 MB
> {{max.partition.fetch.bytes}} = 1 MB
> Similar to this, the consumer should be able to configure 
> {{max.remote.partition.fetch.bytes}} = 4 MB.
> While handling the {{FETCH}} request, if we encounter a partition to read 
> data from remote storage, then rest of the partitions in the request are 
> ignored. Essentially, we are serving only 1 MB of remote data per FETCH 
> request when all the partitions in the request are to be served from the 
> remote storage.
> Providing one more configuration to the client help the user to tune the 
> values depending on their storage plugin. The user might want to optimise the 
> number of calls to remote storage vs amount of bytes returned back to the 
> client in the FETCH response.
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L1454]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15777) Configurable remote fetch bytes per partition from Consumer

2023-11-02 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-15777:
-
Description: 
A consumer can configure the amount of local bytes to read from each partition 
in the FETCH request.

{{max.fetch.bytes}} = 50 MB
{{max.partition.fetch.bytes}} = 1 MB

Similar to this, the consumer should be able to configure 
{{max.remote.partition.fetch.bytes}} = 4 MB.

While handling the {{FETCH}} request, if we encounter a partition to read data 
from remote storage, then rest of the partitions in the request are ignored. 
Essentially, we are serving only 1 MB of remote data per FETCH request when all 
the partitions in the request are to be served from the remote storage.

Providing one more configuration to the client help the user to tune the values 
depends on their storage plugin. The user might want to optimise the number of 
calls to remote storage vs amount of bytes returned back to the client in the 
FETCH response.

[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L1454]

  was:
A consumer can configure the amount of local bytes to read from each partition 
in the FETCH request.

{{max.partition.fetch.bytes}} = 1 MB

Similar to this, the consumer should be able to configure 
{{max.remote.partition.fetch.bytes}} = 4 MB.

While handling the {{FETCH}} request, if we encounter a partition to read data 
from remote storage, then rest of the partitions in the request are ignored. 
Essentially, we are serving only 1 MB of remote data per FETCH request when all 
the partitions in the request are to be served from the remote storage.

Providing one more configuration to the client help the user to tune the values 
depends on their storage plugin. The user might want to optimise the number of 
calls to remote storage vs amount of bytes returned back to the client in the 
FETCH response.

[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L1454]


> Configurable remote fetch bytes per partition from Consumer
> ---
>
> Key: KAFKA-15777
> URL: https://issues.apache.org/jira/browse/KAFKA-15777
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Major
>
> A consumer can configure the amount of local bytes to read from each 
> partition in the FETCH request.
> {{max.fetch.bytes}} = 50 MB
> {{max.partition.fetch.bytes}} = 1 MB
> Similar to this, the consumer should be able to configure 
> {{max.remote.partition.fetch.bytes}} = 4 MB.
> While handling the {{FETCH}} request, if we encounter a partition to read 
> data from remote storage, then rest of the partitions in the request are 
> ignored. Essentially, we are serving only 1 MB of remote data per FETCH 
> request when all the partitions in the request are to be served from the 
> remote storage.
> Providing one more configuration to the client help the user to tune the 
> values depends on their storage plugin. The user might want to optimise the 
> number of calls to remote storage vs amount of bytes returned back to the 
> client in the FETCH response.
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L1454]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15777) Configurable remote fetch bytes per partition from Consumer

2023-11-02 Thread Kamal Chandraprakash (Jira)
Kamal Chandraprakash created KAFKA-15777:


 Summary: Configurable remote fetch bytes per partition from 
Consumer
 Key: KAFKA-15777
 URL: https://issues.apache.org/jira/browse/KAFKA-15777
 Project: Kafka
  Issue Type: Task
Reporter: Kamal Chandraprakash
Assignee: Kamal Chandraprakash


A consumer can configure the amount of local bytes to read from each partition 
in the FETCH request.

{{max.partition.fetch.bytes}} = 1 MB

Similar to this, the consumer should be able to configure 
{{max.remote.partition.fetch.bytes}} = 4 MB.

While handling the {{FETCH}} request, if we encounter a partition to read data 
from remote storage, then rest of the partitions in the request are ignored. 
Essentially, we are serving only 1 MB of remote data per FETCH request when all 
the partitions in the request are to be served from the remote storage.

Providing one more configuration to the client help the user to tune the values 
depends on their storage plugin. The user might want to optimise the number of 
calls to remote storage vs amount of bytes returned back to the client in the 
FETCH response.

[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L1454]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15776) Configurable delay timeout for DelayedRemoteFetch request

2023-11-02 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-15776:
-
Description: 
We are reusing the {{fetch.max.wait.ms}} config as a delay timeout for 
DelayedRemoteFetchPurgatory. {{fetch.max.wait.ms}} purpose is to wait for the 
given amount of time when there is no data available to serve the FETCH request.
{code:java}
The maximum amount of time the server will block before answering the fetch 
request if there isn't sufficient data to immediately satisfy the requirement 
given by fetch.min.bytes.
{code}
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala#L41]

Using the same timeout in the DelayedRemoteFetchPurgatory can confuse the user 
on how to configure optimal value for each purpose. Moreover, the config is of 
*LOW* importance and most of the users won't configure it and use the default 
value of 500 ms.

Having the delay timeout of 500 ms in DelayedRemoteFetchPurgatory can lead to 
higher number of expired delayed remote fetch request when the remote storage 
have degrades to serve within the timeout.

We should introduce one config (preferably server config) to define the delay 
timeout for DelayedRemoteFetch requests (or) take it from client similar to 
{{request.timeout.ms}}.

  was:
We are reusing the {{fetch.max.wait.ms}} config as a delay timeout for 
DelayedRemoteFetchPurgatory. {{fetch.max.wait.ms}} purpose is to wait for the 
given amount of time when there is no data available to serve the FETCH request.
{code:java}
The maximum amount of time the server will block before answering the fetch 
request if there isn't sufficient data to immediately satisfy the requirement 
given by fetch.min.bytes.
{code}
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala#L41]

Using the same timeout in the DelayedRemoteFetchPurgatory can confuse the user 
on how to configure optimal value for each purpose. Moreover, the config is of 
*LOW* importance and most of the users won't configure it and use the default 
value of 500 ms.

Having the delay timeout of 500 ms in DelayedRemoteFetchPurgatory can lead to 
higher number of expired delayed remote fetch request when the remote storage 
have any degradation to serve within the timeout.

We should introduce one config (preferably server config) to define the delay 
timeout for DelayedRemoteFetch requests.


> Configurable delay timeout for DelayedRemoteFetch request
> -
>
> Key: KAFKA-15776
> URL: https://issues.apache.org/jira/browse/KAFKA-15776
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Major
>
> We are reusing the {{fetch.max.wait.ms}} config as a delay timeout for 
> DelayedRemoteFetchPurgatory. {{fetch.max.wait.ms}} purpose is to wait for the 
> given amount of time when there is no data available to serve the FETCH 
> request.
> {code:java}
> The maximum amount of time the server will block before answering the fetch 
> request if there isn't sufficient data to immediately satisfy the requirement 
> given by fetch.min.bytes.
> {code}
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala#L41]
> Using the same timeout in the DelayedRemoteFetchPurgatory can confuse the 
> user on how to configure optimal value for each purpose. Moreover, the config 
> is of *LOW* importance and most of the users won't configure it and use the 
> default value of 500 ms.
> Having the delay timeout of 500 ms in DelayedRemoteFetchPurgatory can lead to 
> higher number of expired delayed remote fetch request when the remote storage 
> have degrades to serve within the timeout.
> We should introduce one config (preferably server config) to define the delay 
> timeout for DelayedRemoteFetch requests (or) take it from client similar to 
> {{request.timeout.ms}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15776) Configurable delay timeout for DelayedRemoteFetch request

2023-11-02 Thread Kamal Chandraprakash (Jira)
Kamal Chandraprakash created KAFKA-15776:


 Summary: Configurable delay timeout for DelayedRemoteFetch request
 Key: KAFKA-15776
 URL: https://issues.apache.org/jira/browse/KAFKA-15776
 Project: Kafka
  Issue Type: Task
Reporter: Kamal Chandraprakash
Assignee: Kamal Chandraprakash


We are reusing the {{fetch.max.wait.ms}} config as a delay timeout for 
DelayedRemoteFetchPurgatory. {{fetch.max.wait.ms}} purpose is to wait for the 
given amount of time when there is no data available to serve the FETCH request.
{code:java}
The maximum amount of time the server will block before answering the fetch 
request if there isn't sufficient data to immediately satisfy the requirement 
given by fetch.min.bytes.
{code}

https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala#L41

Using the same timeout in the DelayedRemoteFetchPurgatory can confuse the user 
on how to configure optimal value for each purpose. Moreover, the config is of 
*LOW* importance and most of the users won't configure it and use the default 
value of 500 ms.

Having the delay timeout of 500 ms in DelayedRemoteFetchPurgatory can lead to 
larger number of expired delayed remote fetch request when the remote storage 
have any degradation to serve within the timeout.

We should introduce one config (preferably server config) to define the delay 
timeout for DelayedRemoteFetch requests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15776) Configurable delay timeout for DelayedRemoteFetch request

2023-11-02 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-15776:
-
Description: 
We are reusing the {{fetch.max.wait.ms}} config as a delay timeout for 
DelayedRemoteFetchPurgatory. {{fetch.max.wait.ms}} purpose is to wait for the 
given amount of time when there is no data available to serve the FETCH request.
{code:java}
The maximum amount of time the server will block before answering the fetch 
request if there isn't sufficient data to immediately satisfy the requirement 
given by fetch.min.bytes.
{code}
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala#L41]

Using the same timeout in the DelayedRemoteFetchPurgatory can confuse the user 
on how to configure optimal value for each purpose. Moreover, the config is of 
*LOW* importance and most of the users won't configure it and use the default 
value of 500 ms.

Having the delay timeout of 500 ms in DelayedRemoteFetchPurgatory can lead to 
higher number of expired delayed remote fetch request when the remote storage 
have any degradation to serve within the timeout.

We should introduce one config (preferably server config) to define the delay 
timeout for DelayedRemoteFetch requests.

  was:
We are reusing the {{fetch.max.wait.ms}} config as a delay timeout for 
DelayedRemoteFetchPurgatory. {{fetch.max.wait.ms}} purpose is to wait for the 
given amount of time when there is no data available to serve the FETCH request.
{code:java}
The maximum amount of time the server will block before answering the fetch 
request if there isn't sufficient data to immediately satisfy the requirement 
given by fetch.min.bytes.
{code}

https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala#L41

Using the same timeout in the DelayedRemoteFetchPurgatory can confuse the user 
on how to configure optimal value for each purpose. Moreover, the config is of 
*LOW* importance and most of the users won't configure it and use the default 
value of 500 ms.

Having the delay timeout of 500 ms in DelayedRemoteFetchPurgatory can lead to 
larger number of expired delayed remote fetch request when the remote storage 
have any degradation to serve within the timeout.

We should introduce one config (preferably server config) to define the delay 
timeout for DelayedRemoteFetch requests.


> Configurable delay timeout for DelayedRemoteFetch request
> -
>
> Key: KAFKA-15776
> URL: https://issues.apache.org/jira/browse/KAFKA-15776
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Major
>
> We are reusing the {{fetch.max.wait.ms}} config as a delay timeout for 
> DelayedRemoteFetchPurgatory. {{fetch.max.wait.ms}} purpose is to wait for the 
> given amount of time when there is no data available to serve the FETCH 
> request.
> {code:java}
> The maximum amount of time the server will block before answering the fetch 
> request if there isn't sufficient data to immediately satisfy the requirement 
> given by fetch.min.bytes.
> {code}
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala#L41]
> Using the same timeout in the DelayedRemoteFetchPurgatory can confuse the 
> user on how to configure optimal value for each purpose. Moreover, the config 
> is of *LOW* importance and most of the users won't configure it and use the 
> default value of 500 ms.
> Having the delay timeout of 500 ms in DelayedRemoteFetchPurgatory can lead to 
> higher number of expired delayed remote fetch request when the remote storage 
> have any degradation to serve within the timeout.
> We should introduce one config (preferably server config) to define the delay 
> timeout for DelayedRemoteFetch requests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15682) Ensure internal remote log metadata topic does not expire its segments before deleting user-topic segments

2023-11-01 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-15682:
-
Description: 
One of the implementation of RemoteLogMetadataManager is 
TopicBasedRemoteLogMetadataManager which uses an internal Kafka topic 
{{__remote_log_metadata}} to store the metadata about the remote log segments. 
Unlike other internal topics which are compaction enabled, this topic is not 
enabled with compaction and retention is set to unlimited. 

Keeping this internal topic retention to unlimited is not practical in real 
world use-case where the topic local disk usage footprint grow huge over a 
period of time. 

It is assumed that the user will set the retention to a reasonable time such 
that it is the max of all the user-created topics (max + X). We can't just rely 
on the assumption and need an assertion to ensure that the internal 
{{__remote_log_metadata}} segments are not eligible for deletion before the 
expiry of all the relevant user-topic uploaded remote-log-segments , otherwise 
there will be dangling remote-log-segments which won't be cleared once all the 
brokers are restarted post the internal topic retention cleanup.

See the discussion thread: 
https://github.com/apache/kafka/pull/14576#discussion_r1368576126

  was:
One of the implementation of RemoteLogMetadataManager is 
TopicBasedRemoteLogMetadataManager which uses an internal Kafka topic 
{{__remote_log_metadata}} to store the metadata about the remote log segments. 
Unlike other internal topics which are compaction enabled, this topic is not 
enabled with compaction and retention is set to unlimited. 

Keeping this internal topic retention to unlimited is not practical in real 
world use-case where the topic local disk usage footprint grow huge over a 
period of time. 

It is assumed that the user will set the retention to a reasonable time such 
that it is the max of all the user-created topics (max + X). We can't just rely 
on the assumption and need an assertion to ensure that the internal 
{{__remote_log_metadata}} segments are not eligible for deletion before the 
expiry of all the relevant user-topic uploaded remote-log-segments , otherwise 
there will be dangling remote-log-segments which won't be cleared once all the 
brokers are restarted post the internal topic retention cleanup.


> Ensure internal remote log metadata topic does not expire its segments before 
> deleting user-topic segments
> --
>
> Key: KAFKA-15682
> URL: https://issues.apache.org/jira/browse/KAFKA-15682
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Priority: Major
>
> One of the implementation of RemoteLogMetadataManager is 
> TopicBasedRemoteLogMetadataManager which uses an internal Kafka topic 
> {{__remote_log_metadata}} to store the metadata about the remote log 
> segments. Unlike other internal topics which are compaction enabled, this 
> topic is not enabled with compaction and retention is set to unlimited. 
> Keeping this internal topic retention to unlimited is not practical in real 
> world use-case where the topic local disk usage footprint grow huge over a 
> period of time. 
> It is assumed that the user will set the retention to a reasonable time such 
> that it is the max of all the user-created topics (max + X). We can't just 
> rely on the assumption and need an assertion to ensure that the internal 
> {{__remote_log_metadata}} segments are not eligible for deletion before the 
> expiry of all the relevant user-topic uploaded remote-log-segments , 
> otherwise there will be dangling remote-log-segments which won't be cleared 
> once all the brokers are restarted post the internal topic retention cleanup.
> See the discussion thread: 
> https://github.com/apache/kafka/pull/14576#discussion_r1368576126



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15703) Update Highwatermark while building the remote log auxilary state

2023-10-27 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-15703:
-
Summary: Update Highwatermark while building the remote log auxilary state  
(was: Update Highwatermark while building the remote log auxillary state)

> Update Highwatermark while building the remote log auxilary state
> -
>
> Key: KAFKA-15703
> URL: https://issues.apache.org/jira/browse/KAFKA-15703
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15703) Update Highwatermark while building the remote log auxiliary state

2023-10-27 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-15703:
-
Summary: Update Highwatermark while building the remote log auxiliary state 
 (was: Update Highwatermark while building the remote log auxilary state)

> Update Highwatermark while building the remote log auxiliary state
> --
>
> Key: KAFKA-15703
> URL: https://issues.apache.org/jira/browse/KAFKA-15703
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15703) Update Highwatermark while building the remote log auxillary state

2023-10-27 Thread Kamal Chandraprakash (Jira)
Kamal Chandraprakash created KAFKA-15703:


 Summary: Update Highwatermark while building the remote log 
auxillary state
 Key: KAFKA-15703
 URL: https://issues.apache.org/jira/browse/KAFKA-15703
 Project: Kafka
  Issue Type: Task
Reporter: Kamal Chandraprakash
Assignee: Kamal Chandraprakash






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15695) Local log start offset is not updated on the follower after rebuilding remote log auxiliary state

2023-10-26 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17780166#comment-17780166
 ] 

Kamal Chandraprakash commented on KAFKA-15695:
--

[~nikramakrishnan]
I remember fixing this issue in 
[#14328|https://github.com/apache/kafka/pull/14328] and covered it via 
integration test. Can you come up with a test to reproduce this issue? Thanks!

> Local log start offset is not updated on the follower after rebuilding remote 
> log auxiliary state
> -
>
> Key: KAFKA-15695
> URL: https://issues.apache.org/jira/browse/KAFKA-15695
> Project: Kafka
>  Issue Type: Bug
>  Components: replication, Tiered-Storage
>Affects Versions: 3.6.0
>Reporter: Nikhil Ramakrishnan
>Assignee: Nikhil Ramakrishnan
>Priority: Major
>  Labels: KIP-405, tiered-storage
> Fix For: 3.7.0
>
>
> In 3.6, the local log start offset is not updated when reconstructing the 
> auxiliary state of the remote log on a follower.
> The impact of this bug is significant because, if this follower becomes the 
> leader before the local log start offset has had a change to be updated, 
> reads from any offset between [wrong log start offset; actual log start 
> offset] will be routed on the local storage, which does not contain the 
> corresponding data. Consumer reads will in this case never be satisfied.
>  
> Reproduction case:
>  # Create a cluster with 2 brokers, broker 0 and broker 1.
>  # Create a topic topicA with RF=2, 1 partition (topicA-0) and 2 batches per 
> segment, with broker 0 as the leader.
>  # Stop broker 1, and produce 3 records to topicA, such that segment 1 with 
> the first two records are copied to remote and deleted from local storage.
>  # Start broker 1, let it catch up with broker 0.
>  # Stop broker 0 such that broker 1 is elected as the leader, and try to 
> consume from the beginning of topicA-0.
> This consumer read will not be satisfied because the local log start offset 
> is not updated on broker 1 when it builds the auxiliary state of the remote 
> log segments.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15682) Ensure internal remote log metadata topic does not expire its segments before deleting user-topic segments

2023-10-25 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-15682:
-
Description: 
One of the implementation of RemoteLogMetadataManager is 
TopicBasedRemoteLogMetadataManager which uses an internal Kafka topic 
{{__remote_log_metadata}} to store the metadata about the remote log segments. 
Unlike other internal topics which are compaction enabled, this topic is not 
enabled with compaction and retention is set to unlimited. 

Keeping this internal topic retention to unlimited is not practical in real 
world use-case where the topic local disk usage footprint grow huge over a 
period of time. 

It is assumed that the user will set the retention to a reasonable time such 
that it is the max of all the user-created topics (max + X). We can't just rely 
on the assumption and need an assertion to ensure that the internal 
{{__remote_log_metadata}} segments are not eligible for deletion before the 
expiry of all the relevant user-topic uploaded remote-log-segments , otherwise 
there will be dangling remote-log-segments which won't be cleared once all the 
brokers are restarted post the internal topic retention cleanup.

  was:
One of the implementation of RemoteLogMetadataManager is 
TopicBasedRemoteLogMetadataManager which uses an internal Kafka topic 
{{__remote_log_metadata}} to store the metadata about the remote log segments. 
Unlike other internal topics which are compaction enabled, this topic is not 
enabled with compaction and retention is set to unlimited. 

Keeping this internal topic retention to unlimited is not practical in real 
world use-case where the topic local disk usage footprint grow huge over a 
period of time. 

It is assumed that the user will set the retention to a reasonable time such 
that it is the max of all the user-created topics (max + X). We can't just rely 
on it and need an assertion before deleting the internal 
{{__remote_log_metadata}} segments, otherwise there will be dangling remote log 
segments which won't be cleared once all the brokers are restarted post the 
topic truncation.


> Ensure internal remote log metadata topic does not expire its segments before 
> deleting user-topic segments
> --
>
> Key: KAFKA-15682
> URL: https://issues.apache.org/jira/browse/KAFKA-15682
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Priority: Major
>
> One of the implementation of RemoteLogMetadataManager is 
> TopicBasedRemoteLogMetadataManager which uses an internal Kafka topic 
> {{__remote_log_metadata}} to store the metadata about the remote log 
> segments. Unlike other internal topics which are compaction enabled, this 
> topic is not enabled with compaction and retention is set to unlimited. 
> Keeping this internal topic retention to unlimited is not practical in real 
> world use-case where the topic local disk usage footprint grow huge over a 
> period of time. 
> It is assumed that the user will set the retention to a reasonable time such 
> that it is the max of all the user-created topics (max + X). We can't just 
> rely on the assumption and need an assertion to ensure that the internal 
> {{__remote_log_metadata}} segments are not eligible for deletion before the 
> expiry of all the relevant user-topic uploaded remote-log-segments , 
> otherwise there will be dangling remote-log-segments which won't be cleared 
> once all the brokers are restarted post the internal topic retention cleanup.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15682) Ensure internal remote log metadata topic does not expire its segments before deleting user-topic segments

2023-10-25 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-15682:
-
Description: 
One of the implementation of RemoteLogMetadataManager is 
TopicBasedRemoteLogMetadataManager which uses an internal Kafka topic 
{{__remote_log_metadata}} to store the metadata about the remote log segments. 
Unlike other internal topics which are compaction enabled, this topic is not 
enabled with compaction and retention is set to unlimited. 

Keeping this internal topic retention to unlimited is not practical in real 
world use-case where the topic local disk usage footprint grow huge over a 
period of time. 

It is assumed that the user will set the retention to a reasonable time such 
that it is the max of all the user-created topics (max + X). We can't just rely 
on it and need an assertion before deleting the internal 
{{__remote_log_metadata}} segments, otherwise there will be dangling remote log 
segments which won't be cleared once all the brokers are restarted post the 
topic truncation.

  was:
One of the implementation of RemoteLogMetadataManager is 
TopicBasedRemoteLogMetadataManager which uses an internal Kafka topic 
{{__remote_log_metadata}}. Unlike other internal topics which are compaction 
enabled, this topic is not enabled with compaction and retention is set to 
unlimited. 

Keeping this internal topic retention to unlimited is not practical in real 
world use-case where the topic local disk usage footprint grow huge over a 
period of time. 

It is assumed that the user will set the retention to a reasonable time such 
that it is the max of all the user-created topics (max + X). We can't just rely 
on it and need an assertion before deleting the internal 
{{__remote_log_metadata}} segments, otherwise there will be dangling remote log 
segments which won't be cleared once all the brokers are restarted post the 
topic truncation.


> Ensure internal remote log metadata topic does not expire its segments before 
> deleting user-topic segments
> --
>
> Key: KAFKA-15682
> URL: https://issues.apache.org/jira/browse/KAFKA-15682
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Priority: Major
>
> One of the implementation of RemoteLogMetadataManager is 
> TopicBasedRemoteLogMetadataManager which uses an internal Kafka topic 
> {{__remote_log_metadata}} to store the metadata about the remote log 
> segments. Unlike other internal topics which are compaction enabled, this 
> topic is not enabled with compaction and retention is set to unlimited. 
> Keeping this internal topic retention to unlimited is not practical in real 
> world use-case where the topic local disk usage footprint grow huge over a 
> period of time. 
> It is assumed that the user will set the retention to a reasonable time such 
> that it is the max of all the user-created topics (max + X). We can't just 
> rely on it and need an assertion before deleting the internal 
> {{__remote_log_metadata}} segments, otherwise there will be dangling remote 
> log segments which won't be cleared once all the brokers are restarted post 
> the topic truncation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15682) Ensure internal remote log metadata topic does not expire its segments before deleting user-topic segments

2023-10-25 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-15682:
-
Description: 
One of the implementation of RemoteLogMetadataManager is 
TopicBasedRemoteLogMetadataManager which uses an internal Kafka topic 
{{__remote_log_metadata}}. Unlike other internal topics which are compaction 
enabled, this topic is not enabled with compaction and retention is set to 
unlimited. 

Keeping this internal topic retention to unlimited is not practical in real 
world use-case where the topic local disk usage footprint grow huge over a 
period of time. 

It is assumed that the user will set the retention to a reasonable time such 
that it is the max of all the user-created topics (max + X). We can't just rely 
on it and need an assertion before deleting the internal 
{{__remote_log_metadata}} segments, otherwise there will be dangling remote log 
segments which won't be cleared once all the brokers are restarted post the 
topic truncation.

  was:
One of the implementation of RemoteLogMetadataManager is 
TopicBasedRemoteLogMetadataManager which uses an internal Kafka topic 
{{__remote_log_metadata}}. Unlike other internal topics which are compaction 
enabled, this topic is not enabled with compaction and retention is set to 
unlimited. 

Keeping this internal topic retention to unlimited is not practical in real 
world use-case where the topic disk usage footprint grows large over a period 
of time. 

It is assumed that the user will set the retention to a reasonable time such 
that it is the max of all the user-created topics (max + X). We can't just rely 
on it and need an assertion before deleting the internal 
{{__remote_log_metadata}} segments, otherwise there will be dangling remote log 
segments which won't be cleared once all the brokers are restarted post the 
topic truncation.


> Ensure internal remote log metadata topic does not expire its segments before 
> deleting user-topic segments
> --
>
> Key: KAFKA-15682
> URL: https://issues.apache.org/jira/browse/KAFKA-15682
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Priority: Major
>
> One of the implementation of RemoteLogMetadataManager is 
> TopicBasedRemoteLogMetadataManager which uses an internal Kafka topic 
> {{__remote_log_metadata}}. Unlike other internal topics which are compaction 
> enabled, this topic is not enabled with compaction and retention is set to 
> unlimited. 
> Keeping this internal topic retention to unlimited is not practical in real 
> world use-case where the topic local disk usage footprint grow huge over a 
> period of time. 
> It is assumed that the user will set the retention to a reasonable time such 
> that it is the max of all the user-created topics (max + X). We can't just 
> rely on it and need an assertion before deleting the internal 
> {{__remote_log_metadata}} segments, otherwise there will be dangling remote 
> log segments which won't be cleared once all the brokers are restarted post 
> the topic truncation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15682) Ensure internal remote log metadata topic does not expire its segments before deleting user-topic segments

2023-10-25 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-15682:
-
Description: 
One of the implementation of RemoteLogMetadataManager is 
TopicBasedRemoteLogMetadataManager which uses an internal Kafka topic 
{{__remote_log_metadata}}. Unlike other internal topics which are compaction 
enabled, this topic is not enabled with compaction and retention is set to 
unlimited. 

Keeping this internal topic retention to unlimited is not practical in real 
world use-case where the topic disk usage footprint grows large over a period 
of time. 

It is assumed that the user will set the retention to a reasonable time such 
that it is the max of all the user-created topics (max + X). We can't just rely 
on it and need an assertion before deleting the internal 
{{__remote_log_metadata}} segments, otherwise there will be dangling remote log 
segments which won't be cleared once all the brokers are restarted post the 
topic truncation.

  was:
One of the implementation of RemoteLogMetadataManager is 
TopicBasedRemoteLogMetadataManager which uses an internal Kafka topic. Unlike 
other internal topics which are compaction enabled, this topic is not enabled 
with compaction and retention is set to unlimited. 

Keeping this internal topic retention to unlimited is not practical in real 
world use-case where the topic disk usage footprint grows large over a period 
of time. 

It is assumed that the user will set the retention to a reasonable time such 
that it is the max of all the user-created topics (max + X). We can't just rely 
on it and need an assertion before deleting the internal 
{{__remote_log_metadata}} segments, otherwise there will be dangling remote log 
segments which won't be cleared once all the brokers are restarted post the 
topic truncation.


> Ensure internal remote log metadata topic does not expire its segments before 
> deleting user-topic segments
> --
>
> Key: KAFKA-15682
> URL: https://issues.apache.org/jira/browse/KAFKA-15682
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Priority: Major
>
> One of the implementation of RemoteLogMetadataManager is 
> TopicBasedRemoteLogMetadataManager which uses an internal Kafka topic 
> {{__remote_log_metadata}}. Unlike other internal topics which are compaction 
> enabled, this topic is not enabled with compaction and retention is set to 
> unlimited. 
> Keeping this internal topic retention to unlimited is not practical in real 
> world use-case where the topic disk usage footprint grows large over a period 
> of time. 
> It is assumed that the user will set the retention to a reasonable time such 
> that it is the max of all the user-created topics (max + X). We can't just 
> rely on it and need an assertion before deleting the internal 
> {{__remote_log_metadata}} segments, otherwise there will be dangling remote 
> log segments which won't be cleared once all the brokers are restarted post 
> the topic truncation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15682) Ensure internal remote log metadata topic does not expire its segments before deleting user-topic segments

2023-10-25 Thread Kamal Chandraprakash (Jira)
Kamal Chandraprakash created KAFKA-15682:


 Summary: Ensure internal remote log metadata topic does not expire 
its segments before deleting user-topic segments
 Key: KAFKA-15682
 URL: https://issues.apache.org/jira/browse/KAFKA-15682
 Project: Kafka
  Issue Type: Task
Reporter: Kamal Chandraprakash


One of the implementation of RemoteLogMetadataManager is 
TopicBasedRemoteLogMetadataManager which uses an internal Kafka topic. Unlike 
other internal topics which are compaction enabled, this topic is not enabled 
with compaction and retention is set to unlimited. 

Keeping this internal topic retention to unlimited is not practical in real 
world use-case where the topic disk usage footprint grows large over a period 
of time. 

It is assumed that the user will set the retention to a reasonable time such 
that it is the max of all the user-created topics (max + X). We can't just rely 
on it and need an assertion before deleting the internal 
{{__remote_log_metadata}} segments, otherwise there will be dangling remote log 
segments which won't be cleared once all the brokers are restarted post the 
topic truncation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15660) File-based Tiered Storage should delete folders upon topic deletion

2023-10-21 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17778040#comment-17778040
 ] 

Kamal Chandraprakash commented on KAFKA-15660:
--

In RemoteStorageManager, we don't have an API to differentiate between deleting 
remote log segments due to breach by retention (or) by deletion. So, the folder 
will be left out while deleting the topic in the LocalTieredStorage 
implementation.

> File-based Tiered Storage should delete folders upon topic deletion
> ---
>
> Key: KAFKA-15660
> URL: https://issues.apache.org/jira/browse/KAFKA-15660
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.6.0
>Reporter: Christo Lolov
>Priority: Minor
>
> We have added a quick-start guide for Tiered Storage as part of Apache Kafka 
> 3.6 - [https://kafka.apache.org/documentation/#tiered_storage_config_ex.]
> When interacting with it, however, it appears that when topics are deleted 
> while remote segments and their indecies are deleted the folders are not:
> {code:java}
> > ls /tmp/kafka-remote-storage/kafka-tiered-storage 
> A-0-ApBdPOE1SOOw-Ie8RQLuAA  B-0-2omLZKw1Tiu2-EUKsIzj9Q  
> C-0-FXdccGWXQJCj-RQynsOK3Q  D-0-vqfdYtYLSlWEyXp6cwwmpg
> > ls /tmp/kafka-remote-storage/kafka-tiered-storage/A-0-ApBdPOE1SOOw-Ie8RQLuAA
> > bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
> __remote_log_metadata{code}
> I think that the file-based implementation shipping with Kafka should delete 
> the folders as well.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15620) Duplicate remote log DELETE_SEGMENT metadata is generated when there are multiple leader epochs in the segment

2023-10-18 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17776999#comment-17776999
 ] 

Kamal Chandraprakash commented on KAFKA-15620:
--

[~h...@pinterest.com]

What was the impact of this bug? Going by the code, it will log one warning 
statement and proceed further. 

> Duplicate remote log DELETE_SEGMENT metadata is generated when there are 
> multiple leader epochs in the segment
> --
>
> Key: KAFKA-15620
> URL: https://issues.apache.org/jira/browse/KAFKA-15620
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 3.6.0
>Reporter: Henry Cai
>Priority: Major
> Fix For: 3.7.0, 3.6.1
>
>
> Use the newly released 3.6.0, turn on tiered storage feature: 
> {*}remote.log.storage.system.enable{*}=true
> 1. Set up topic tier5 to be remote storage enabled.  Adding some data to the 
> topic and the data is copied to remote storage.  After a few days when the 
> log segment is removed from remote storage due to log retention expiration, 
> noticed the following warnings in the server log:
> [2023-10-16 22:19:10,971] DEBUG Updating remote log segment metadata: 
> [RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, id=YFNCaWjPQFSKCngQ1QcKpA}
> , customMetadata=Optional.empty, state=DELETE_SEGMENT_STARTED, 
> eventTimestampMs=1697005926358, brokerId=1043}] 
> (org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache)
> [2023-10-16 22:19:10,971] WARN Error occurred while updating the remote log 
> segment. 
> (org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore)
> org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException: 
> No remote log segment metadata found for 
> :RemoteLogSegmentId\{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, 
> id=YFNCaWjPQFSKCngQ1QcKpA}
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache.updateRemoteLogSegmentMetadata(RemoteLogMetadataCache.java:161)
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore.handleRemoteLogSegmentMetadataUpdate(RemotePartitionMetadataStore.java:89)
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataEventHandler.handleRemoteLogMetadata(RemotePartitionMetadataEventHandler.java:33)
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.processConsumerRecord(ConsumerTask.java:157)
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.run(ConsumerTask.java:133)
>         at java.base/java.lang.Thread.run(Thread.java:829)
>  
> 2. After some debugging, realized the problem is *there are 2 sets of 
> DELETE_SEGMENT_STARTED/FINISHED pairs* in the remote metadata topic for this 
> segment.  The DELETE_SEGMENT_FINISHED in the first set remove the segment 
> from the metadata cache and this caused the above exception when the 
> DELETE_SEGMENT_STARTED from the second set needs to be processed.
>  
> 3. And traced the log to where the log retention kicked in and saw *there 
> were two delete log segment generated* at that time:
> ```
> [2023-10-10 23:32:05,929] INFO [RemoteLogManager=1043 
> partition=QelVeVmER5CkjrzIiF07PQ:tier5-0] About to delete remote log segment 
> RemoteLogSegmentId{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, 
> id={*}YFNCaWjPQFSKCngQ1QcKpA{*}} due to retention time 60480ms breach 
> based on the largest record timestamp in the segment 
> (kafka.log.remote.RemoteLogManager$RLMTask)
> [2023-10-10 23:32:05,929] INFO [RemoteLogManager=1043 
> partition=QelVeVmER5CkjrzIiF07PQ:tier5-0] About to delete remote log segment 
> RemoteLogSegmentId{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, 
> id={*}YFNCaWjPQFSKCngQ1QcKpA{*}} due to retention time 60480ms breach 
> based on the largest record timestamp in the segment 
> (kafka.log.remote.RemoteLogManager$RLMTask)
> ```
> 4. And dumped out the content of the original COPY_SEGMENT_STARTED for this 
> segment (which triggers the generation of the later DELETE_SEGMENT metadata):
> [2023-10-16 22:19:10,894] DEBUG Adding to in-progress state: 
> [RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, id=YFNCaWjPQFSKCngQ1QcKpA}
> , startOffset=6387830, endOffset=9578660, brokerId=1043, 
> maxTimestampMs=1696401123036, eventTimestampMs=1696401127062, 
> segmentLeaderEpochs=\{5=6387830, 6=6721329}, segmentSizeInBytes=511987531, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}] 
> (org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache)
>  
> You can see there are 2 

[jira] [Comment Edited] (KAFKA-15620) Duplicate remote log DELETE_SEGMENT metadata is generated when there are multiple leader epochs in the segment

2023-10-18 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17776822#comment-17776822
 ] 

Kamal Chandraprakash edited comment on KAFKA-15620 at 10/18/23 6:09 PM:


[~h...@pinterest.com] 

This issue was fixed in KAFKA-15479.


was (Author: ckamal):
[~h...@pinterest.com] 

This issue was fixed in 
[KAFKA-15479|https://issues.apache.org/jira/browse/KAFKA-15479] 

> Duplicate remote log DELETE_SEGMENT metadata is generated when there are 
> multiple leader epochs in the segment
> --
>
> Key: KAFKA-15620
> URL: https://issues.apache.org/jira/browse/KAFKA-15620
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 3.6.0
>Reporter: Henry Cai
>Priority: Major
> Fix For: 3.6.1
>
>
> Use the newly released 3.6.0, turn on tiered storage feature: 
> {*}remote.log.storage.system.enable{*}=true
> 1. Set up topic tier5 to be remote storage enabled.  Adding some data to the 
> topic and the data is copied to remote storage.  After a few days when the 
> log segment is removed from remote storage due to log retention expiration, 
> noticed the following warnings in the server log:
> [2023-10-16 22:19:10,971] DEBUG Updating remote log segment metadata: 
> [RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, id=YFNCaWjPQFSKCngQ1QcKpA}
> , customMetadata=Optional.empty, state=DELETE_SEGMENT_STARTED, 
> eventTimestampMs=1697005926358, brokerId=1043}] 
> (org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache)
> [2023-10-16 22:19:10,971] WARN Error occurred while updating the remote log 
> segment. 
> (org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore)
> org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException: 
> No remote log segment metadata found for 
> :RemoteLogSegmentId\{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, 
> id=YFNCaWjPQFSKCngQ1QcKpA}
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache.updateRemoteLogSegmentMetadata(RemoteLogMetadataCache.java:161)
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore.handleRemoteLogSegmentMetadataUpdate(RemotePartitionMetadataStore.java:89)
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataEventHandler.handleRemoteLogMetadata(RemotePartitionMetadataEventHandler.java:33)
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.processConsumerRecord(ConsumerTask.java:157)
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.run(ConsumerTask.java:133)
>         at java.base/java.lang.Thread.run(Thread.java:829)
>  
> 2. After some debugging, realized the problem is *there are 2 sets of 
> DELETE_SEGMENT_STARTED/FINISHED pairs* in the remote metadata topic for this 
> segment.  The DELETE_SEGMENT_FINISHED in the first set remove the segment 
> from the metadata cache and this caused the above exception when the 
> DELETE_SEGMENT_STARTED from the second set needs to be processed.
>  
> 3. And traced the log to where the log retention kicked in and saw *there 
> were two delete log segment generated* at that time:
> ```
> [2023-10-10 23:32:05,929] INFO [RemoteLogManager=1043 
> partition=QelVeVmER5CkjrzIiF07PQ:tier5-0] About to delete remote log segment 
> RemoteLogSegmentId{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, 
> id={*}YFNCaWjPQFSKCngQ1QcKpA{*}} due to retention time 60480ms breach 
> based on the largest record timestamp in the segment 
> (kafka.log.remote.RemoteLogManager$RLMTask)
> [2023-10-10 23:32:05,929] INFO [RemoteLogManager=1043 
> partition=QelVeVmER5CkjrzIiF07PQ:tier5-0] About to delete remote log segment 
> RemoteLogSegmentId{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, 
> id={*}YFNCaWjPQFSKCngQ1QcKpA{*}} due to retention time 60480ms breach 
> based on the largest record timestamp in the segment 
> (kafka.log.remote.RemoteLogManager$RLMTask)
> ```
> 4. And dumped out the content of the original COPY_SEGMENT_STARTED for this 
> segment (which triggers the generation of the later DELETE_SEGMENT metadata):
> [2023-10-16 22:19:10,894] DEBUG Adding to in-progress state: 
> [RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, id=YFNCaWjPQFSKCngQ1QcKpA}
> , startOffset=6387830, endOffset=9578660, brokerId=1043, 
> maxTimestampMs=1696401123036, eventTimestampMs=1696401127062, 
> segmentLeaderEpochs=\{5=6387830, 6=6721329}, segmentSizeInBytes=511987531, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}] 

[jira] [Updated] (KAFKA-15632) Drop the invalid remote log metadata events

2023-10-18 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-15632:
-
Description: 
{{__remote_log_metadata}} topic cleanup policy is set to {{DELETE}} and default 
retention is set to unlimited.

The expectation is that the user will configure the maximum retention time for 
this internal topic compared to all the other user created topics in the 
cluster. We cannot keep it to unlimited as the contents of this internal topic 
need to be in the local storage.

RemoteLogMetadata cache expect the events to be in the order of 
[RemoteLogSegmentState#isValidTransition|https://github.com/apache/kafka/blob/trunk/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentState.java#L93]

Once the retention got expired for this topic say after 30 days due to breach 
by size/time, then there can be partial metadata events and the 
[cache|https://github.com/apache/kafka/blob/trunk/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java#L160]
 start to throw RemoteResourceNotFoundError.

  was:
{{__remote_log_metadata}} topic cleanup policy is set to {{DELETE}} and default 
retention is set to unlimited.

The expectation is that the user will configure the maximum retention time for 
this internal topic compared to all the other user created topics in the 
cluster. We cannot keep it to unlimited as the contents of this internal topic 
need to be in the local storage.

RemoteLogMetadata cache expect the events to be in the order of 
[RemoteLogSegmentState#isValidTransition|https://github.com/apache/kafka/blob/trunk/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentState.java#L93]

Once the retention time got expired for this topic say after 30 days due to 
breach by retention size/time, then there can be partial metadata events and 
the 
[cache|https://github.com/apache/kafka/blob/trunk/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java#L160]
 start to throw RemoteResourceNotFoundError.


> Drop the invalid remote log metadata events 
> 
>
> Key: KAFKA-15632
> URL: https://issues.apache.org/jira/browse/KAFKA-15632
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Major
>
> {{__remote_log_metadata}} topic cleanup policy is set to {{DELETE}} and 
> default retention is set to unlimited.
> The expectation is that the user will configure the maximum retention time 
> for this internal topic compared to all the other user created topics in the 
> cluster. We cannot keep it to unlimited as the contents of this internal 
> topic need to be in the local storage.
> RemoteLogMetadata cache expect the events to be in the order of 
> [RemoteLogSegmentState#isValidTransition|https://github.com/apache/kafka/blob/trunk/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentState.java#L93]
> Once the retention got expired for this topic say after 30 days due to breach 
> by size/time, then there can be partial metadata events and the 
> [cache|https://github.com/apache/kafka/blob/trunk/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java#L160]
>  start to throw RemoteResourceNotFoundError.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15632) Drop the invalid remote log metadata events

2023-10-18 Thread Kamal Chandraprakash (Jira)
Kamal Chandraprakash created KAFKA-15632:


 Summary: Drop the invalid remote log metadata events 
 Key: KAFKA-15632
 URL: https://issues.apache.org/jira/browse/KAFKA-15632
 Project: Kafka
  Issue Type: Task
Reporter: Kamal Chandraprakash
Assignee: Kamal Chandraprakash


{{__remote_log_metadata}} topic cleanup policy is set to {{DELETE}} and default 
retention is set to unlimited.

The expectation is that the user will configure the maximum retention time for 
this internal topic compared to all the other user created topics in the 
cluster. We cannot keep it to unlimited as the contents of this internal topic 
need to be in the local storage.

RemoteLogMetadata cache expect the events to be in the order of 
[RemoteLogSegmentState#isValidTransition|https://github.com/apache/kafka/blob/trunk/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentState.java#L93]

Once the retention time got expired for this topic say after 30 days due to 
breach by retention size/time, then there can be partial metadata events and 
the 
[cache|https://github.com/apache/kafka/blob/trunk/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java#L160]
 start to throw RemoteResourceNotFoundError.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15620) Duplicate remote log DELETE_SEGMENT metadata is generated when there are multiple leader epochs in the segment

2023-10-18 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17776822#comment-17776822
 ] 

Kamal Chandraprakash commented on KAFKA-15620:
--

[~h...@pinterest.com] 

This issue was fixed in 
[KAFKA-15479|https://issues.apache.org/jira/browse/KAFKA-15479] 

> Duplicate remote log DELETE_SEGMENT metadata is generated when there are 
> multiple leader epochs in the segment
> --
>
> Key: KAFKA-15620
> URL: https://issues.apache.org/jira/browse/KAFKA-15620
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 3.6.0
>Reporter: Henry Cai
>Priority: Major
> Fix For: 3.6.1
>
>
> Use the newly released 3.6.0, turn on tiered storage feature: 
> {*}remote.log.storage.system.enable{*}=true
> 1. Set up topic tier5 to be remote storage enabled.  Adding some data to the 
> topic and the data is copied to remote storage.  After a few days when the 
> log segment is removed from remote storage due to log retention expiration, 
> noticed the following warnings in the server log:
> [2023-10-16 22:19:10,971] DEBUG Updating remote log segment metadata: 
> [RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, id=YFNCaWjPQFSKCngQ1QcKpA}
> , customMetadata=Optional.empty, state=DELETE_SEGMENT_STARTED, 
> eventTimestampMs=1697005926358, brokerId=1043}] 
> (org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache)
> [2023-10-16 22:19:10,971] WARN Error occurred while updating the remote log 
> segment. 
> (org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore)
> org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException: 
> No remote log segment metadata found for 
> :RemoteLogSegmentId\{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, 
> id=YFNCaWjPQFSKCngQ1QcKpA}
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache.updateRemoteLogSegmentMetadata(RemoteLogMetadataCache.java:161)
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore.handleRemoteLogSegmentMetadataUpdate(RemotePartitionMetadataStore.java:89)
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataEventHandler.handleRemoteLogMetadata(RemotePartitionMetadataEventHandler.java:33)
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.processConsumerRecord(ConsumerTask.java:157)
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.run(ConsumerTask.java:133)
>         at java.base/java.lang.Thread.run(Thread.java:829)
>  
> 2. After some debugging, realized the problem is *there are 2 sets of 
> DELETE_SEGMENT_STARTED/FINISHED pairs* in the remote metadata topic for this 
> segment.  The DELETE_SEGMENT_FINISHED in the first set remove the segment 
> from the metadata cache and this caused the above exception when the 
> DELETE_SEGMENT_STARTED from the second set needs to be processed.
>  
> 3. And traced the log to where the log retention kicked in and saw *there 
> were two delete log segment generated* at that time:
> ```
> [2023-10-10 23:32:05,929] INFO [RemoteLogManager=1043 
> partition=QelVeVmER5CkjrzIiF07PQ:tier5-0] About to delete remote log segment 
> RemoteLogSegmentId{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, 
> id={*}YFNCaWjPQFSKCngQ1QcKpA{*}} due to retention time 60480ms breach 
> based on the largest record timestamp in the segment 
> (kafka.log.remote.RemoteLogManager$RLMTask)
> [2023-10-10 23:32:05,929] INFO [RemoteLogManager=1043 
> partition=QelVeVmER5CkjrzIiF07PQ:tier5-0] About to delete remote log segment 
> RemoteLogSegmentId{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, 
> id={*}YFNCaWjPQFSKCngQ1QcKpA{*}} due to retention time 60480ms breach 
> based on the largest record timestamp in the segment 
> (kafka.log.remote.RemoteLogManager$RLMTask)
> ```
> 4. And dumped out the content of the original COPY_SEGMENT_STARTED for this 
> segment (which triggers the generation of the later DELETE_SEGMENT metadata):
> [2023-10-16 22:19:10,894] DEBUG Adding to in-progress state: 
> [RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, id=YFNCaWjPQFSKCngQ1QcKpA}
> , startOffset=6387830, endOffset=9578660, brokerId=1043, 
> maxTimestampMs=1696401123036, eventTimestampMs=1696401127062, 
> segmentLeaderEpochs=\{5=6387830, 6=6721329}, segmentSizeInBytes=511987531, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}] 
> (org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache)
>  
> You can see there are 2 leader epochs in this 

[jira] [Commented] (KAFKA-15525) Segment uploads stop working following a broker failure

2023-10-02 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17771065#comment-17771065
 ] 

Kamal Chandraprakash commented on KAFKA-15525:
--

While uploading the segment, the RemoteLogManager sends an event to the 
internal topic, if it's unavailable then it cannot upload the segment. 
{{rlmm.config.remote.log.metadata.topic.replication.factor}} is set to 1, can 
you try increasing the replication-factor to 3 (or) 4?

> Segment uploads stop working following a broker failure
> ---
>
> Key: KAFKA-15525
> URL: https://issues.apache.org/jira/browse/KAFKA-15525
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.6.0
>Reporter: Francois Visconte
>Priority: Major
>
> I have a tiered-storage enabled cluster and topic where I continuously 
> produce and consume to/from a TS-enabled topic on that cluster.
> Here are the topic settings I’m using: 
> {code:java}
> local.retention.ms=12
> remote.storage.enable=true
> retention.ms: 1080
> segment.bytes: 51200
> {code}
> Here are my broker settings:
> {code:java}
> remote.log.storage.system.enable=true
> remote.log.storage.manager.class.path=/opt/kafka/tiered-storage-libs/*
> remote.log.storage.manager.class.name=io.aiven.kafka.tieredstorage.RemoteStorageManager
> remote.log.metadata.manager.class.name=org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager
> remote.log.metadata.manager.listener.name=INTERNAL_PLAINTEXT
> remote.log.manager.task.interval.ms=5000
> remote.log.manager.thread.pool.size=10
> remote.log.reader.threads=10
> remote.log.reader.max.pending.tasks=100
> rlmm.config.remote.log.metadata.topic.replication.factor=1
> rlmm.config.remote.log.metadata.topic.num.partitions=50
> rlmm.config.remote.log.metadata.topic.retention.ms=-1
> rsm.config.chunk.cache.class=io.aiven.kafka.tieredstorage.chunkmanager.cache.DiskBasedChunkCache
> rsm.config.chunk.cache.path=/data/tiered-storage-cache
> rsm.config.chunk.cache.size=1073741824
> rsm.config.metrics.recording.level=DEBUG    
> rsm.config.storage.aws.credentials.provider.class=software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider
> rsm.config.storage.backend.class.name=io.aiven.kafka.tieredstorage.storage.s3.S3Storage
> rsm.config.storage.backend.class=io.aiven.kafka.tieredstorage.storage.s3.S3Storage
> rsm.config.storage.s3.region=us-east-1
> rsm.config.chunk.size=102400
> rsm.config.storage.s3.multipart.upload.part.size=16777216 {code}
> When a broker in the cluster get rotated (replaced or restarted) some brokers 
> start throwing this error repeatedly: 
> {code:java}
> [RemoteLogManager=1 partition=yTypIvtBRY2l3sD4-8M7fA:loadgen-3] Error 
> occurred while copying log segments of partition: 
> yTypIvtBRY2l3sD4-8M7fA:loadgen-3 
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.KafkaException: 
> java.util.concurrent.TimeoutException: Timed out in catching up with the 
> expected offset by consumer.
>     at 
> java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
>     at 
> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
>     at 
> kafka.log.remote.RemoteLogManager$RLMTask.copyLogSegment(RemoteLogManager.java:728)
>     at 
> kafka.log.remote.RemoteLogManager$RLMTask.copyLogSegmentsToRemote(RemoteLogManager.java:687)
>     at 
> kafka.log.remote.RemoteLogManager$RLMTask.run(RemoteLogManager.java:790)
>     at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
>     at 
> java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
>     at 
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
>     at java.base/java.lang.Thread.run(Thread.java:833)
> Caused by: org.apache.kafka.common.KafkaException: 
> java.util.concurrent.TimeoutException: Timed out in catching up with the 
> expected offset by consumer.
>     at 
> org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager.lambda$storeRemoteLogMetadata$0(TopicBasedRemoteLogMetadataManager.java:188)
>     at 
> java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:718)
>     at 
> java.base/java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:483)
>     at 
> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
>     at 
> 

  1   2   3   >