Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2024-01-14 Thread via GitHub


satishd merged PR #15060:
URL: https://github.com/apache/kafka/pull/15060


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2024-01-14 Thread via GitHub


satishd commented on PR #15060:
URL: https://github.com/apache/kafka/pull/15060#issuecomment-1891112974

   Merging it to trunk as the test failures are unrelated to this change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2024-01-13 Thread via GitHub


satishd commented on PR #15060:
URL: https://github.com/apache/kafka/pull/15060#issuecomment-1890811313

   @iit2009060 As we discussed earlier in the 
[comment](https://github.com/apache/kafka/pull/15060#issuecomment-1882487316), 
this issue is only applicable to compact policy and the behaviour on retention 
policy is as expected. 
   
   As I mentioned in my earlier 
[comment](https://github.com/apache/kafka/pull/15060#issuecomment-1880861009), 
I am fine to merge this PR which partially addresses the problem of compact 
policy by iterating through the remote log segments and we can have a followup 
PR on the remaining cases.
   
   We can discuss the possible cases and solutions in 
[KAFKA-15388](https://issues.apache.org/jira/browse/KAFKA-15388)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2024-01-12 Thread via GitHub


iit2009060 commented on PR #15060:
URL: https://github.com/apache/kafka/pull/15060#issuecomment-1890348438

   @satishd  @kamalcph  @divijvaidya  I am able to reproduce the issue which 
@satishd  mentioned , I need to introduced a  delay to the movement of segments 
to remote through a hacky code. 
   
   1. Created topic test 10 
   2. Set segment bytes 100 so that each segment contain only one offset.
   3. Set clean up policy  compact 
   4. Produce some messages (1:1,1:2,1:3,1:4,1:5,1:6,1:7)
   5. Log compaction is in progress 
   6.  Delete log compaction policy from the topic 
   8. Enable remote storage enable = true
   
   I introduced a code which copied remote segments whose base offset  <= 1 in 
the RemoteLogManager so that we can have a situation where in remote segments  
data is not available , but in local/active segment data is available. 
   https://github.com/apache/kafka/assets/59436466/cc018078-801e-4850-9f1f-917ae6326202;>
   
   9. In the local log segments 0 and 1 has been removed and moved to a remote 
log storage where the number of bytes is zero as data is removed because of log 
compaction. Observe local segments contains data only for offset >=2. 
   **Local log segments view**
   https://github.com/apache/kafka/assets/59436466/6062b6f4-33ba-43dd-8495-2fb9bacff69e;>
   **Remote log segments view** 
   https://github.com/apache/kafka/assets/59436466/e91e0f5b-1dc4-4569-98e1-a32eebdbaf5c;>
   Remote log contain only two segments 0 and 1 , both are empty because of log 
compaction. 
   10. Execute consumer service 
   `bin/kafka-console-consumer.sh  --bootstrap-server localhost:9092 --topic 
test10  --offset 0 --partition 0 --property print.offset=true --property 
print.key=true --property print.value=true --consumer-property 
auto.offset.reset=earliest
   `
   This command never proceeds as RemoteLogManager return empty records and we 
do not have a mechanism yet to handle cases when RemoteLog
   https://github.com/apache/kafka/assets/59436466/4051d50f-906e-4ee2-8475-a855f51909aa;>
   Manager return empty records. 
   
   https://github.com/apache/kafka/assets/59436466/424015c0-c4d3-4b03-ab3c-2940c02a9ad9;>
   https://github.com/apache/kafka/assets/59436466/334d807a-3f4c-4b00-8990-fde48912b14e;>
   
   @satishd  @divijvaidya  @kamalcph  Let me know if you have a suggestion to 
fix this use case .
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2024-01-12 Thread via GitHub


iit2009060 commented on PR #15060:
URL: https://github.com/apache/kafka/pull/15060#issuecomment-1888750316

   > https://issues.apache.org/jira/browse/KAFKA-16088
   
   Thanks @kamalcph  for the discussion. 
   @satishd  I discussed with  @kamalcph  on the above cases 
   1. If auto offset reset  is **latest**, we can still lose the messages on 
the case mentioned above. That is the expected behaviour with the "**latest**" 
configuration. 
   2. But if auto offset is earliest , The issue never occurs. 
   
   @satishd   We can merge the current PR if it is feasible.  I will continue 
to work and see if i can reproduce the case mentioned  by you around log 
compaction. And will open a separate PR once the issue is reproducible.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2024-01-09 Thread via GitHub


iit2009060 commented on PR #15060:
URL: https://github.com/apache/kafka/pull/15060#issuecomment-1882690345

   > @iit2009060 In step 7, it is anticipated that setting the fetch offset to 
1 should follow the specified process to trigger an `OutOfOffsetRangeError` 
with empty records as I mentioned 
[earlier](https://github.com/apache/kafka/pull/15060#issuecomment-1882487316).
   > 
   > > I will try with from-beginning option and update the behaviour.
   > 
   > Sure.
   > 
   > > Even for the latest configuration it should work?
   > 
   > `latest` option also should work. To validate this, run the step with the 
`latest` setting and concurrently run `kafka-console-producer`, generating 
additional messages beyond the latest offset where the `OutOfOffsetRangeError` 
was encountered. Subsequently, the consumer should be able to consume the newly 
produced sequence of messages.
   
   @satishd  I will try to write a integration or unit test to depicts the 
scenario I mentioned. However as far I noticed ,the  additional messages I 
reproduced in step 5 were not able to retrieve with fetch offset 1 request  and 
were able to proceed with offset 6.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2024-01-09 Thread via GitHub


satishd commented on PR #15060:
URL: https://github.com/apache/kafka/pull/15060#issuecomment-1882660480

   @iit2009060 In step 7, it is anticipated that setting the fetch offset to 1 
should follow the specified process to trigger an `OutOfOffsetRangeError` with 
empty records as I mentioned 
[earlier](https://github.com/apache/kafka/pull/15060#issuecomment-1882487316).
   
   >I will try with from-beginning option and update the behaviour.
   
   Sure.
   
   >Even for the latest configuration it should work?
   
   `latest` option also should work. To validate this, run the step with the 
`latest` setting and concurrently run `kafka-console-producer`, generating 
additional messages beyond the latest offset where the `OutOfOffsetRangeError` 
was encountered. Subsequently, the consumer should be able to consume the newly 
produced sequence of messages.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2024-01-08 Thread via GitHub


iit2009060 commented on PR #15060:
URL: https://github.com/apache/kafka/pull/15060#issuecomment-1882517442

   > auto-offset-reset
   
   
   
   > @iit2009060 The behaviour that you mentioned in the 
[comment](https://github.com/apache/kafka/pull/15060#issuecomment-1879657273) 
seems to be working as expected. The client seems to be receiving 
OffsetOutOfRangeException. When a consumer client sends a request for fetch 
offset that is out of range then it receives offset out of range error and it 
sends the next request based on `auto-offset-reset` strategy that is set on the 
consumer client. It seems the default value for `kafka-console-consumer` is 
`latest` if not set through an `auto.offset.reset` property or does not use 
`--from-beginning` argument. In your case, it may be trying to fetch the latest 
offset in the next request and there are no messages published to the topic 
beyond that offset as there may not be any more messages published after that 
offset. You can try running the same scenario of fetching out of range offset 1 
with `--from-beginning` and observe the behavior.
   > 
   > We can syncup offline to understand your scenario better.
   
   @satishd  Check the **5th step** mentioned in the 
[comment](https://github.com/apache/kafka/pull/15060#issuecomment-1879657273)  
where I produced some data. In the **7th step** I made an offset request with 1 
and 6 both ,6th worked fine  but the request with offset 1 goes through the step
   1. handle OutOfOffSetRangeError
   2. It goes through the if condition mentioned in the **step 8** 
   3. Then it goes through the else part 
   `} else {
 createLogReadResult(exception)
   }`
   4. As mentioned in **step 9**screenshot It creates empty record response 
with exception wrapped in the LogReadResult.
   
   Default configuration is **latest** in the  console consumer. Even for the 
**latest** configuration it should work?
   I will try with from-beginning option and  update the behaviour. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2024-01-08 Thread via GitHub


satishd commented on PR #15060:
URL: https://github.com/apache/kafka/pull/15060#issuecomment-1882487316

   @iit2009060 
   The behaviour that you mentioned in the 
[comment](https://github.com/apache/kafka/pull/15060#issuecomment-1879657273) 
seems to be working as expected. The client seems to be receiving 
OffsetOutOfRangeException. When a consumer client sends a request for fetch 
offset that is out of range then it receives offset out of range error and it 
sends the next request based on `auto-offset-reset` strategy that is set on the 
consumer client. It seems the default value for `kafka-console-consumer` is 
`latest` if not set through an `auto.offset.reset` property or does not use 
`--from-beginning` argument. 
   In your case, it may be trying to fetch the latest offset in the next 
request and there are no messages published to the topic beyond that offset as 
there may not be any more messages published after that offset. You can try 
running the same scenario of fetching out of range offset 1 with 
`--from-beginning` and observe the behavior.
   
   We can syncup offline to understand your scenario better. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2024-01-08 Thread via GitHub


iit2009060 commented on PR #15060:
URL: https://github.com/apache/kafka/pull/15060#issuecomment-1880890006

   > @iit2009060 
[KAFKA-15388](https://issues.apache.org/jira/browse/KAFKA-15388) is about 
addressing the fetch requests for tiered storage enabled topics that have 
compacted offsets in a partition before their cleanup policy is changed to 
delete. This PR only covers if the next offset is available only within the 
remote log segments. But there is a case when the offset is available in the 
subsequent local segments(not only in active local segments but in any of the 
local only segments) that have not yet been copied to remote storage. The 
solution does not address 
[KAFKA-15388](https://issues.apache.org/jira/browse/KAFKA-15388) completely.
   > 
   > I am fine to merge this and have a followup PR for the mentioned scenario 
to resolve [KAFKA-15388](https://issues.apache.org/jira/browse/KAFKA-15388) 
addressing the compacted topics before tiered storage is enabled.
   
   @satishd   Yes, as you mentioned the current PR is not covering reading from 
local log segments ,i.e. data not available in RemoteLogSegments and the reason 
can be 
   1. Log compaction
   2. Retention 
As the above specific scenario touch point multiple feature and not 
generally a specific issue because of log compaction. 
   The intention behind creating a separate 
[ticket](https://issues.apache.org/jira/browse/KAFKA-16088) is to provide a 
dedicated space for addressing the broader issue, encompassing scenarios beyond 
the scope of the current PR. 
   However I am ok if we want to have a follow up PR to merge this. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2024-01-08 Thread via GitHub


satishd commented on PR #15060:
URL: https://github.com/apache/kafka/pull/15060#issuecomment-1880861009

   @iit2009060  KAFKA-15388 is about addressing the fetch requests for tiered 
storage enabled topics that have compacted offsets in a partition before their 
cleanup policy is changed to delete. This PR only covers if the next offset is 
available only within the remote log segments. But there is a case when the 
offset is available in the subsequent local segments(not only in active local 
segments but in any of the local only segments) that have not yet been copied 
to remote storage. The solution does not address KAFKA-15388 completely.
   
   I am fine to merge this and have a followup PR for the mentioned scenario to 
resolve KAFKA-15388 addressing the compacted topics before tiered storage is 
enabled.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2024-01-08 Thread via GitHub


iit2009060 commented on PR #15060:
URL: https://github.com/apache/kafka/pull/15060#issuecomment-1880775216

   > > @satishd @divijvaidya @kamalcph I am able to reproduce the above 
scenario using retention feature instead of log compaction.
   > > The overall problem is we are sending MemoryRecords.Empty when unable to 
find offset even though active segments can still have the data.
   > > Consider the below scenario
   > > 
   > > 1. create topic test8 with partition 0 and with remote storage enabled.
   > > 2. Current status of topic (Offset 0,2,3,4)
   > > 
   > > https://private-user-images.githubusercontent.com/59436466/294679830-ab83f9e3-a44b-4c22-8d72-20a07fc6ce6d.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MDQ2MTI0MjksIm5iZiI6MTcwNDYxMjEyOSwicGF0aCI6Ii81OTQzNjQ2Ni8yOTQ2Nzk4MzAtYWI4M2Y5ZTMtYTQ0Yi00YzIyLThkNzItMjBhMDdmYzZjZTZkLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDAxMDclMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwMTA3VDA3MjIwOVomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTcwNzlkNzQ1NTRmYzFmZjU5OTNjYmMwZDM5OTU5YmZjNjBhZjczMzdjNjg4ZDQxZWI2NzdiM2U5ZTlhNzQwMjcmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.M4Y1cvGSDNozMQTsAWwwq9K8ZHkCnGUZLqy6fKBk5GY;>
 3. When we make a fetch request with offset 1 https://private-user-images.githubusercontent.com/59436466/294679851-9ef58907-a545-403b-a004-20e094945d30.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MDQ2MTI0MjksIm5iZiI6MTcwNDYxMjEyOSwicGF0aCI6Ii81OTQzNjQ2Ni8yOTQ2Nzk4NTEtOWVmNTg5MDctYTU0NS00MDNiLWEwMDQtMjBlMDk0OTQ1ZDMwLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDAxMDclMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwMTA3VDA3MjIwOVomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTNmNzY2OTUzMTUwMGE3NzNhODE5ZmJlYWIxYWRjNDBhNjRjMTJlNWUzNTllMTQ5MDFhMmZmMDdiM2E0MDAxNTcmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.HeHVo2nfLE0HLDe4QkEcrFzx1_hUGrwgnaifTzJPo_I;>
 4. To delete remote segments I have set up the configuration retention.ms=1000 
https://private-user-images.
 
githubusercontent.com/59436466/294679912-ba5b34cd-b9f1-4bb3-9574-8ebe9cd37741.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MDQ2MTI0MjksIm5iZiI6MTcwNDYxMjEyOSwicGF0aCI6Ii81OTQzNjQ2Ni8yOTQ2Nzk5MTItYmE1YjM0Y2QtYjlmMS00YmIzLTk1NzQtOGViZTljZDM3NzQxLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDAxMDclMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwMTA3VDA3MjIwOVomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTUxZjRmODg4MjRiNjE3Njc5M2E1Y2IyYzViZWVlMzdhNTU2Mzk4MzMzMmI0ZjcxMGZiZDUyYjZjYmRhY2EyZjMmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.loKBLUJmgPtGvrnAFspaabHWZDzMPl_c1M8K4mqdRZQ">
 5. Now once the data is deleted, reset the local and remote retention to 1 
hour. Produce some data https://private-user-images.githubusercon
 
tent.com/59436466/294680022-ee91b9a7-8fbe-4586-8ca8-3164fe82bddd.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MDQ2MTI0MjksIm5iZiI6MTcwNDYxMjEyOSwicGF0aCI6Ii81OTQzNjQ2Ni8yOTQ2ODAwMjItZWU5MWI5YTctOGZiZS00NTg2LThjYTgtMzE2NGZlODJiZGRkLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDAxMDclMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwMTA3VDA3MjIwOVomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTA0NDI1MjRiM2YwNWRkNGJiYjE2NWVmNzhiOTBhNjEzYmU5YTFjNTJmYTQ4N2Q1NjgyYjQwMTE1ZmY4MDYwYTMmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.OWvD41CLgnMU69aL_4z5Lak-KlZKCnxRl7apaDQx-DA">
 7. When I am trying to fetch offset 1 from the topic test 8 partition 0 , it 
never able to respond. Ideally it should pick the data from the active segment 
whose offset starts at 6. 

Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2024-01-07 Thread via GitHub


kamalcph commented on PR #15060:
URL: https://github.com/apache/kafka/pull/15060#issuecomment-1879987059

   > In general this is happening because The next fetch request offset 
increments only when there is a record list. But the Remote fetch 
implementation by default if not able to find the records sents Empty Records.
   
   Empty records response is sent back to the consumer so that it can retry the 
request if needed. In a non-compacted topic, the records are expected to be 
present in the contiguous manner. 
   
   Shall we list the scenarios where the record offsets won't be in contiguous 
manner for regular (non-compacted) topic (or) write a unit test to reproduce 
this problem?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2024-01-06 Thread via GitHub


iit2009060 commented on PR #15060:
URL: https://github.com/apache/kafka/pull/15060#issuecomment-1879980873

   > @satishd @divijvaidya @kamalcph I am able to reproduce the above scenario 
using retention feature instead of log compaction.
   > 
   > The overall problem is we are sending MemoryRecords.Empty when unable to 
find offset even though active segments can still have the data.
   > 
   > Consider the below scenario
   > 
   > 1. create topic test8 with partition 0 and with remote storage enabled.
   > 2. Current status of topic (Offset 0,2,3,4)
   > 
   > https://private-user-images.githubusercontent.com/59436466/294679830-ab83f9e3-a44b-4c22-8d72-20a07fc6ce6d.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MDQ2MTI0MjksIm5iZiI6MTcwNDYxMjEyOSwicGF0aCI6Ii81OTQzNjQ2Ni8yOTQ2Nzk4MzAtYWI4M2Y5ZTMtYTQ0Yi00YzIyLThkNzItMjBhMDdmYzZjZTZkLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDAxMDclMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwMTA3VDA3MjIwOVomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTcwNzlkNzQ1NTRmYzFmZjU5OTNjYmMwZDM5OTU5YmZjNjBhZjczMzdjNjg4ZDQxZWI2NzdiM2U5ZTlhNzQwMjcmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.M4Y1cvGSDNozMQTsAWwwq9K8ZHkCnGUZLqy6fKBk5GY;>
 3. When we make a fetch request with offset 1 https://private-user-images.githubusercontent.com/59436466/294679851-9ef58907-a545-403b-a004-20e094945d30.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MDQ2MTI0MjksIm5iZiI6MTcwNDYxMjEyOSwicGF0aCI6Ii81OTQzNjQ2Ni8yOTQ2Nzk4NTEtOWVmNTg5MDctYTU0NS00MDNiLWEwMDQtMjBlMDk0OTQ1ZDMwLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDAxMDclMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwMTA3VDA3MjIwOVomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTNmNzY2OTUzMTUwMGE3NzNhODE5ZmJlYWIxYWRjNDBhNjRjMTJlNWUzNTllMTQ5MDFhMmZmMDdiM2E0MDAxNTcmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.HeHVo2nfLE0HLDe4QkEcrFzx1_hUGrwgnaifTzJPo_I;>
 4. To delete remote segments I have set up the configuration retention.ms=1000 
https://private-user-images.gi
 
thubusercontent.com/59436466/294679912-ba5b34cd-b9f1-4bb3-9574-8ebe9cd37741.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MDQ2MTI0MjksIm5iZiI6MTcwNDYxMjEyOSwicGF0aCI6Ii81OTQzNjQ2Ni8yOTQ2Nzk5MTItYmE1YjM0Y2QtYjlmMS00YmIzLTk1NzQtOGViZTljZDM3NzQxLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDAxMDclMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwMTA3VDA3MjIwOVomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTUxZjRmODg4MjRiNjE3Njc5M2E1Y2IyYzViZWVlMzdhNTU2Mzk4MzMzMmI0ZjcxMGZiZDUyYjZjYmRhY2EyZjMmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.loKBLUJmgPtGvrnAFspaabHWZDzMPl_c1M8K4mqdRZQ">
 5. Now once the data is deleted, reset the local and remote retention to 1 
hour. Produce some data https://private-user-images.githubuserconte
 
nt.com/59436466/294680022-ee91b9a7-8fbe-4586-8ca8-3164fe82bddd.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MDQ2MTI0MjksIm5iZiI6MTcwNDYxMjEyOSwicGF0aCI6Ii81OTQzNjQ2Ni8yOTQ2ODAwMjItZWU5MWI5YTctOGZiZS00NTg2LThjYTgtMzE2NGZlODJiZGRkLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDAxMDclMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwMTA3VDA3MjIwOVomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTA0NDI1MjRiM2YwNWRkNGJiYjE2NWVmNzhiOTBhNjEzYmU5YTFjNTJmYTQ4N2Q1NjgyYjQwMTE1ZmY4MDYwYTMmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.OWvD41CLgnMU69aL_4z5Lak-KlZKCnxRl7apaDQx-DA">
 7. When I am trying to fetch offset 1 from the topic test 8 partition 0 , it 
never able to respond. Ideally it should pick the data from the active segment 
whose offset starts at 6. 

Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2024-01-06 Thread via GitHub


iit2009060 commented on PR #15060:
URL: https://github.com/apache/kafka/pull/15060#issuecomment-1879657273

   @satishd  @divijvaidya @kamalcph  I am able to reproduce the above scenario 
using retention feature instead of log compaction. 
   
   The overall problem is we are sending MemoryRecords.Empty when unable to 
find offset  even though active segments can still have the data. 
   
   Consider the below scenario 
   1. create topic test8 with partition 0 and with remote storage enabled.
   2. Current status of topic (Offset 0,2,3,4)
   https://github.com/apache/kafka/assets/59436466/ab83f9e3-a44b-4c22-8d72-20a07fc6ce6d;>
   3. When we make a fetch request with offset 1 
   https://github.com/apache/kafka/assets/59436466/9ef58907-a545-403b-a004-20e094945d30;>
   4. To delete remote segments I have set up the configuration 
retention.ms=1000 
   https://github.com/apache/kafka/assets/59436466/ba5b34cd-b9f1-4bb3-9574-8ebe9cd37741;>
   5. Now once the data is deleted reset the local and remote retention to 1 
hour. 
   6. Produce some data 
   https://github.com/apache/kafka/assets/59436466/ee91b9a7-8fbe-4586-8ca8-3164fe82bddd;>
   7. When I am trying to fetch offset 1 from the topic test 8 partition 0 , it 
never able to respond. Ideally it should pick the data from the active segment 
whose offset starts at 6.
   https://github.com/apache/kafka/assets/59436466/15a560ca-9396-4a73-9c16-8812cda36adf;>
   8. It happened  because requestedoffset 1 is not greater than 
log.logStartOffset which is 6
   
   https://github.com/apache/kafka/assets/59436466/067ed7d9-5bc7-4d56-b0c9-799dabdad9fa;>
   
   9. We create Empty Record Response. 
   https://github.com/apache/kafka/assets/59436466/f93a53e7-f7ff-4885-849b-f95fedd830e9;>
   
   In general this is happening because The next fetch request offset 
increments only when there is a record list. But the Remote fetch 
implementation by default if not able to find the records sents Empty Records. 
   
   We should create separate JIRA to track this .The issue is not specific to 
log compaction but it  will happen whenever we sent  Records Empty from remote 
storage.
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2024-01-04 Thread via GitHub


iit2009060 commented on PR #15060:
URL: https://github.com/apache/kafka/pull/15060#issuecomment-1877542832

   > @iit2009060 You may have mistyped as controller but it does not have any 
role in the fetch path here. Remote fetch is executed through a specific 
purgatory with DelayedRemoteFetch and read operation/callback is executed in a 
specific thread pool. It returns empty records when there is no data for a 
specific offset in the remote storage.
   
   Yes @satishd  correct . I am still going through the code but need few 
inputs 
   1. If we send records empty , How it determine the next offset to be 
fetched. Because in LogOffSetMetadata we always return the fetchOffset request. 
Or it keep requesting the fetchOffSet request ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2024-01-04 Thread via GitHub


iit2009060 commented on PR #15060:
URL: https://github.com/apache/kafka/pull/15060#issuecomment-1877528171

   > Thanks @iit2009060 for the PR.
   > 
   > Let us say there are two segments in remote storage and subsequents 
segments in local storage. remote-seg-10[10, 20], remote-seg-21[21, 30] : 
offsets 25 to 30 are compacted. local-seg-31[31, 40]
   > 
   > When a fetch request comes for offsets with in [25, 30] then it should 
move to the local segment as those offsets might have been compacted earlier. 
Did you also cover this scenario in this PR?
   
   @satishd  I am trying to reproduce the case  when last offsets of the 
segment  earlier than the active segment is compacted away. 
   I tried locally but not able to reproduce the above scenario ? 
   There always exist a last offset  in the segment earlier than the active 
segment. 
   As per the article 
   
https://towardsdatascience.com/log-compacted-topics-in-apache-kafka-b1aa1e4665a7
   All records in the active segment are never compacted. 
   @satishd  Do you have a step in mind to regenerate the above scenario ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2024-01-04 Thread via GitHub


satishd commented on PR #15060:
URL: https://github.com/apache/kafka/pull/15060#issuecomment-1877333095

   @iit2009060 You may have mistyped as controller but it does not have any 
role in the fetch path here. 
   Remote fetch is executed through a specific purgatory with 
DelayedRemoteFetch and read operation/callback is executed in a specific thread 
pool. It returns empty records when there is no data for a specific offset in 
the remote storage.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2024-01-03 Thread via GitHub


iit2009060 commented on PR #15060:
URL: https://github.com/apache/kafka/pull/15060#issuecomment-1876338999

   > Thanks @iit2009060 for the PR.
   > 
   > Let us say there are two segments in remote storage and subsequents 
segments in local storage. remote-seg-10[10, 20], remote-seg-21[21, 30] : 
offsets 25 to 30 are compacted. local-seg-31[31, 40]
   > 
   > When a fetch request comes for offsets with in [25, 30] then it should 
move to the local segment as those offsets might have been compacted earlier. 
Did you also cover this scenario in this PR?
   
   @satishd  I have not tested this case explicitly. 
   In this case  RemoteLogManager would be returning firstBatch as null and the 
controller(The class which is invoking RemoteLogManager read)  should take care 
of reading the next segment locally. Let me reproduce this issue locally and 
update the behaviour. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2024-01-02 Thread via GitHub


iit2009060 commented on code in PR #15060:
URL: https://github.com/apache/kafka/pull/15060#discussion_r1440153281


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -1340,6 +1346,10 @@ public FetchDataInfo read(RemoteStorageFetchInfo 
remoteStorageFetchInfo) throws
 Utils.closeQuietly(remoteSegInputStream, 
"RemoteLogSegmentInputStream");
 }
 }
+// for testing
+public RemoteLogInputStream getRemoteLogInputStream(InputStream in) {

Review Comment:
   @kamalcph Yes sure. Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2024-01-02 Thread via GitHub


kamalcph commented on code in PR #15060:
URL: https://github.com/apache/kafka/pull/15060#discussion_r1440146317


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -1340,6 +1346,10 @@ public FetchDataInfo read(RemoteStorageFetchInfo 
remoteStorageFetchInfo) throws
 Utils.closeQuietly(remoteSegInputStream, 
"RemoteLogSegmentInputStream");
 }
 }
+// for testing
+public RemoteLogInputStream getRemoteLogInputStream(InputStream in) {

Review Comment:
   @iit2009060  can we remove the `public` method access specifier?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2024-01-02 Thread via GitHub


iit2009060 commented on PR #15060:
URL: https://github.com/apache/kafka/pull/15060#issuecomment-1874937679

   > LGTM, thanks for the patch!
   > 
   > This patch handles the FETCH request for previously compacted topic 
segments uploaded to remote storage. We also have to go through the
   > 
   > 1. `upload`, `deletion` path  and
   > 2. RemoteLogMetadataCache internal state
   > 
   > for both normal and unclean-leader-election scenarios.
   https://issues.apache.org/jira/browse/KAFKA-15388 
   As mentioned in the description for upload and delete it will not be  
impacted. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2024-01-02 Thread via GitHub


iit2009060 commented on code in PR #15060:
URL: https://github.com/apache/kafka/pull/15060#discussion_r1440133601


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -1289,15 +1289,21 @@ public FetchDataInfo read(RemoteStorageFetchInfo 
remoteStorageFetchInfo) throws
 }
 
 RemoteLogSegmentMetadata remoteLogSegmentMetadata = 
rlsMetadataOptional.get();
-int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, 
offset);
 InputStream remoteSegInputStream = null;
 try {
-// Search forward for the position of the last offset that is 
greater than or equal to the target offset
-remoteSegInputStream = 
remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos);
-RemoteLogInputStream remoteLogInputStream = new 
RemoteLogInputStream(remoteSegInputStream);
-
-RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, 
offset);
-
+int startPos = 0;
+RecordBatch firstBatch = null;
+while (firstBatch == null && rlsMetadataOptional.isPresent()) {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2024-01-02 Thread via GitHub


iit2009060 commented on code in PR #15060:
URL: https://github.com/apache/kafka/pull/15060#discussion_r1440126550


##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -2065,6 +2066,11 @@ public Optional 
fetchRemoteLogSegmentMetadata(TopicPar
 return Optional.of(segmentMetadata);
 }
 
+public Optional 
findNextSegmentMetadata(RemoteLogSegmentMetadata segmentMetadata,
+  
Option leaderEpochFileCacheOption) {
+return Optional.ofNullable(null);

Review Comment:
   done.



##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -2161,6 +2167,85 @@ RecordBatch findFirstBatch(RemoteLogInputStream 
remoteLogInputStream, long offse
 }
 }
 
+@Test
+public void testReadForFirstBatchInLogCompaction() throws 
RemoteStorageException, IOException {
+FileInputStream fileInputStream = mock(FileInputStream.class);
+RemoteLogInputStream remoteLogInputStream = 
mock(RemoteLogInputStream.class);
+ClassLoaderAwareRemoteStorageManager rsmManager = 
mock(ClassLoaderAwareRemoteStorageManager.class);
+RemoteLogSegmentMetadata segmentMetadata = 
mock(RemoteLogSegmentMetadata.class);
+LeaderEpochFileCache cache = mock(LeaderEpochFileCache.class);
+when(cache.epochForOffset(anyLong())).thenReturn(OptionalInt.of(1));
+
+
when(remoteStorageManager.fetchLogSegment(any(RemoteLogSegmentMetadata.class), 
anyInt()))

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2024-01-02 Thread via GitHub


iit2009060 commented on code in PR #15060:
URL: https://github.com/apache/kafka/pull/15060#discussion_r1440116425


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -1340,6 +1346,10 @@ public FetchDataInfo read(RemoteStorageFetchInfo 
remoteStorageFetchInfo) throws
 Utils.closeQuietly(remoteSegInputStream, 
"RemoteLogSegmentInputStream");
 }
 }
+// for testing
+public RemoteLogInputStream getRemoteLogInputStream(InputStream in) {

Review Comment:
   @kamalcph  Do we really see a need for passing actual ByteArrayInputStream, 
as it will make it difficult to mock the behaviour.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2024-01-02 Thread via GitHub


iit2009060 commented on code in PR #15060:
URL: https://github.com/apache/kafka/pull/15060#discussion_r1440115636


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -1340,6 +1346,10 @@ public FetchDataInfo read(RemoteStorageFetchInfo 
remoteStorageFetchInfo) throws
 Utils.closeQuietly(remoteSegInputStream, 
"RemoteLogSegmentInputStream");
 }
 }
+// for testing
+public RemoteLogInputStream getRemoteLogInputStream(InputStream in) {

Review Comment:
   @kamalcph  I want to mock the RemoteLoginputStream so that i can invoke and 
mockthe nextBatch function accordingly. In the first iteration I want the 
firstBatch to be null  and in the next iteration firstBatch to be available.
   `   when(remoteLogInputStream.nextBatch()).thenReturn(null, firstBatch);`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2024-01-02 Thread via GitHub


iit2009060 commented on code in PR #15060:
URL: https://github.com/apache/kafka/pull/15060#discussion_r1440115636


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -1340,6 +1346,10 @@ public FetchDataInfo read(RemoteStorageFetchInfo 
remoteStorageFetchInfo) throws
 Utils.closeQuietly(remoteSegInputStream, 
"RemoteLogSegmentInputStream");
 }
 }
+// for testing
+public RemoteLogInputStream getRemoteLogInputStream(InputStream in) {

Review Comment:
   @kamalcph  I want to mock the RemoteLoginputStream so that i can invoke the 
nextBatch function. In the first iteration I want the firstBatch to be null  
and in the next iteration firstBatch to be available.
   `   when(remoteLogInputStream.nextBatch()).thenReturn(null, firstBatch);`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2024-01-02 Thread via GitHub


kamalcph commented on code in PR #15060:
URL: https://github.com/apache/kafka/pull/15060#discussion_r1440097317


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -1289,15 +1289,21 @@ public FetchDataInfo read(RemoteStorageFetchInfo 
remoteStorageFetchInfo) throws
 }
 
 RemoteLogSegmentMetadata remoteLogSegmentMetadata = 
rlsMetadataOptional.get();
-int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, 
offset);
 InputStream remoteSegInputStream = null;
 try {
-// Search forward for the position of the last offset that is 
greater than or equal to the target offset
-remoteSegInputStream = 
remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos);
-RemoteLogInputStream remoteLogInputStream = new 
RemoteLogInputStream(remoteSegInputStream);
-
-RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, 
offset);
-
+int startPos = 0;
+RecordBatch firstBatch = null;
+while (firstBatch == null && rlsMetadataOptional.isPresent()) {

Review Comment:
   Add one comment above the `while` loop to mention the case that we handled 
for posterity.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2024-01-02 Thread via GitHub


kamalcph commented on code in PR #15060:
URL: https://github.com/apache/kafka/pull/15060#discussion_r1440089289


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -1340,6 +1346,10 @@ public FetchDataInfo read(RemoteStorageFetchInfo 
remoteStorageFetchInfo) throws
 Utils.closeQuietly(remoteSegInputStream, 
"RemoteLogSegmentInputStream");
 }
 }
+// for testing
+public RemoteLogInputStream getRemoteLogInputStream(InputStream in) {

Review Comment:
   can we reduce the method access specifier to package-private? I would 
suggest to remove this method and return the actual stream instead of 
mockStream for `remoteStorageManager.fetchLogSegment`:
   
   ```java
   new ByteArrayInputStream(records(ts, startOffset, 
targetLeaderEpoch).buffer().array())
   ```



##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -2161,6 +2167,85 @@ RecordBatch findFirstBatch(RemoteLogInputStream 
remoteLogInputStream, long offse
 }
 }
 
+@Test
+public void testReadForFirstBatchInLogCompaction() throws 
RemoteStorageException, IOException {
+FileInputStream fileInputStream = mock(FileInputStream.class);
+RemoteLogInputStream remoteLogInputStream = 
mock(RemoteLogInputStream.class);
+ClassLoaderAwareRemoteStorageManager rsmManager = 
mock(ClassLoaderAwareRemoteStorageManager.class);
+RemoteLogSegmentMetadata segmentMetadata = 
mock(RemoteLogSegmentMetadata.class);
+LeaderEpochFileCache cache = mock(LeaderEpochFileCache.class);
+when(cache.epochForOffset(anyLong())).thenReturn(OptionalInt.of(1));
+
+
when(remoteStorageManager.fetchLogSegment(any(RemoteLogSegmentMetadata.class), 
anyInt()))

Review Comment:
   There are two remote-storage-manager: `rsmManager` (local) and 
`remoteStorageManager` (global). Can we discard the latter one?



##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -2065,6 +2066,11 @@ public Optional 
fetchRemoteLogSegmentMetadata(TopicPar
 return Optional.of(segmentMetadata);
 }
 
+public Optional 
findNextSegmentMetadata(RemoteLogSegmentMetadata segmentMetadata,
+  
Option leaderEpochFileCacheOption) {
+return Optional.ofNullable(null);

Review Comment:
   Can we use `Optional.empty()` instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2024-01-02 Thread via GitHub


iit2009060 commented on PR #15060:
URL: https://github.com/apache/kafka/pull/15060#issuecomment-1874100254

   @clolov Can we merge the request ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2024-01-02 Thread via GitHub


clolov commented on code in PR #15060:
URL: https://github.com/apache/kafka/pull/15060#discussion_r1439460528


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -1289,15 +1289,21 @@ public FetchDataInfo read(RemoteStorageFetchInfo 
remoteStorageFetchInfo) throws
 }
 
 RemoteLogSegmentMetadata remoteLogSegmentMetadata = 
rlsMetadataOptional.get();
-int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, 
offset);
 InputStream remoteSegInputStream = null;
 try {
-// Search forward for the position of the last offset that is 
greater than or equal to the target offset
-remoteSegInputStream = 
remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos);
-RemoteLogInputStream remoteLogInputStream = new 
RemoteLogInputStream(remoteSegInputStream);
-
-RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, 
offset);
-
+int startPos = 0;
+RecordBatch firstBatch = null;
+while (firstBatch == null && rlsMetadataOptional.isPresent()) {

Review Comment:
   Apologies for the delay, okay, this reasoning makes sense to me, but even if 
I am wrong the while loop won't really cause a performance impact, so I am 
happy with it!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2023-12-28 Thread via GitHub


iit2009060 commented on PR #15060:
URL: https://github.com/apache/kafka/pull/15060#issuecomment-1870969987

   @clolov @divijvaidya  Do you have any additional review comments ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2023-12-24 Thread via GitHub


iit2009060 commented on code in PR #15060:
URL: https://github.com/apache/kafka/pull/15060#discussion_r1435854787


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -1289,15 +1289,21 @@ public FetchDataInfo read(RemoteStorageFetchInfo 
remoteStorageFetchInfo) throws
 }
 
 RemoteLogSegmentMetadata remoteLogSegmentMetadata = 
rlsMetadataOptional.get();
-int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, 
offset);
 InputStream remoteSegInputStream = null;
 try {
-// Search forward for the position of the last offset that is 
greater than or equal to the target offset
-remoteSegInputStream = 
remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos);
-RemoteLogInputStream remoteLogInputStream = new 
RemoteLogInputStream(remoteSegInputStream);
-
-RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, 
offset);
-
+int startPos = 0;
+RecordBatch firstBatch = null;
+while (firstBatch == null && rlsMetadataOptional.isPresent()) {

Review Comment:
   @clolov I believe this merging of segments happens in the background , but 
before the merging start we disable the compaction on the topic to make it 
ready for tiered storage because of which merging of segments never happens.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2023-12-24 Thread via GitHub


clolov commented on code in PR #15060:
URL: https://github.com/apache/kafka/pull/15060#discussion_r1435781798


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -1289,15 +1289,21 @@ public FetchDataInfo read(RemoteStorageFetchInfo 
remoteStorageFetchInfo) throws
 }
 
 RemoteLogSegmentMetadata remoteLogSegmentMetadata = 
rlsMetadataOptional.get();
-int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, 
offset);
 InputStream remoteSegInputStream = null;
 try {
-// Search forward for the position of the last offset that is 
greater than or equal to the target offset
-remoteSegInputStream = 
remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos);
-RemoteLogInputStream remoteLogInputStream = new 
RemoteLogInputStream(remoteSegInputStream);
-
-RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, 
offset);
-
+int startPos = 0;
+RecordBatch firstBatch = null;
+while (firstBatch == null && rlsMetadataOptional.isPresent()) {

Review Comment:
   Huh, interesting, I shall circle back to this, maybe I am not as familiar 
with compaction as I thought I was. To be honest, I was expecting that the 
segments 0 and 6 will be merged into one by compaction - I was under the 
impression that it doesn't leave empty segments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2023-12-22 Thread via GitHub


iit2009060 commented on code in PR #15060:
URL: https://github.com/apache/kafka/pull/15060#discussion_r1435445309


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -1289,15 +1289,21 @@ public FetchDataInfo read(RemoteStorageFetchInfo 
remoteStorageFetchInfo) throws
 }
 
 RemoteLogSegmentMetadata remoteLogSegmentMetadata = 
rlsMetadataOptional.get();
-int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, 
offset);
 InputStream remoteSegInputStream = null;
 try {
-// Search forward for the position of the last offset that is 
greater than or equal to the target offset
-remoteSegInputStream = 
remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos);
-RemoteLogInputStream remoteLogInputStream = new 
RemoteLogInputStream(remoteSegInputStream);
-
-RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, 
offset);
-
+int startPos = 0;
+RecordBatch firstBatch = null;
+while (firstBatch == null && rlsMetadataOptional.isPresent()) {

Review Comment:
   @clolov  We need to use a while loop as there may be a possibility ,the next 
segment in the iteration is also log compacted and we may need to further move 
until we find it. Check the example above I attached in the description PR 
where 0.log and 6.log both are log compacted fully and  the next batch exist 
only in the 07.log. 
   The  similar logic is used in fetching data from the log segment. 
   
https://github.com/apache/kafka/blob/317f61dfd9a7f1443cf75b6a32568d1c81984d08/core/src/main/scala/kafka/log/LocalLog.scala#L425



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2023-12-22 Thread via GitHub


iit2009060 commented on code in PR #15060:
URL: https://github.com/apache/kafka/pull/15060#discussion_r1435445309


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -1289,15 +1289,21 @@ public FetchDataInfo read(RemoteStorageFetchInfo 
remoteStorageFetchInfo) throws
 }
 
 RemoteLogSegmentMetadata remoteLogSegmentMetadata = 
rlsMetadataOptional.get();
-int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, 
offset);
 InputStream remoteSegInputStream = null;
 try {
-// Search forward for the position of the last offset that is 
greater than or equal to the target offset
-remoteSegInputStream = 
remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos);
-RemoteLogInputStream remoteLogInputStream = new 
RemoteLogInputStream(remoteSegInputStream);
-
-RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, 
offset);
-
+int startPos = 0;
+RecordBatch firstBatch = null;
+while (firstBatch == null && rlsMetadataOptional.isPresent()) {

Review Comment:
   @clolov  We need to use a while loop as there may be a possibility ,the next 
segment in the iteration is also log compacted and we may need to further move 
until we find it. Check the example above I attached in the description PR 
where 0.log and 6.log both are log compacted fully and  the next batch exist 
only in the 07.log. 
   This is the same logic used for fetching data from the log segment. 
   
https://github.com/apache/kafka/blob/317f61dfd9a7f1443cf75b6a32568d1c81984d08/core/src/main/scala/kafka/log/LocalLog.scala#L425



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2023-12-22 Thread via GitHub


iit2009060 commented on code in PR #15060:
URL: https://github.com/apache/kafka/pull/15060#discussion_r1435444785


##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -2161,6 +2167,86 @@ RecordBatch findFirstBatch(RemoteLogInputStream 
remoteLogInputStream, long offse
 }
 }
 
+@Test
+public void testReadForFirstBatchInLogCompaction() throws 
RemoteStorageException, IOException {
+FileInputStream fileInputStream = mock(FileInputStream.class);
+RemoteLogInputStream remoteLogInputStream = 
mock(RemoteLogInputStream.class);
+ClassLoaderAwareRemoteStorageManager rsmManager = 
mock(ClassLoaderAwareRemoteStorageManager.class);
+RemoteLogSegmentMetadata segmentMetadata = 
mock(RemoteLogSegmentMetadata.class);
+LeaderEpochFileCache cache = mock(LeaderEpochFileCache.class);
+when(cache.epochForOffset(anyLong())).thenReturn(OptionalInt.of(1));
+
+
when(remoteStorageManager.fetchLogSegment(any(RemoteLogSegmentMetadata.class), 
anyInt()))
+.thenAnswer(a -> fileInputStream);
+when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
+int fetchOffset = 0;
+int fetchMaxBytes = 10;
+int recordBatchSizeInBytes = fetchMaxBytes + 1;
+RecordBatch firstBatch = mock(RecordBatch.class);
+ArgumentCaptor capture = 
ArgumentCaptor.forClass(ByteBuffer.class);
+
+FetchRequest.PartitionData partitionData = new 
FetchRequest.PartitionData(
+Uuid.randomUuid(), fetchOffset, 0, fetchMaxBytes, 
Optional.empty()
+);
+
+when(rsmManager.fetchLogSegment(any(), 
anyInt())).thenReturn(fileInputStream);
+when(remoteLogMetadataManager.remoteLogSegmentMetadata(any(), 
anyInt(), anyLong())).thenReturn(Optional.of(segmentMetadata), 
Optional.of(segmentMetadata));

Review Comment:
   Thanks for pointing out. This may be even not required as I already mocked 
fetchRemoteLogSegmentMetadata function. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2023-12-22 Thread via GitHub


iit2009060 commented on code in PR #15060:
URL: https://github.com/apache/kafka/pull/15060#discussion_r1435445309


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -1289,15 +1289,21 @@ public FetchDataInfo read(RemoteStorageFetchInfo 
remoteStorageFetchInfo) throws
 }
 
 RemoteLogSegmentMetadata remoteLogSegmentMetadata = 
rlsMetadataOptional.get();
-int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, 
offset);
 InputStream remoteSegInputStream = null;
 try {
-// Search forward for the position of the last offset that is 
greater than or equal to the target offset
-remoteSegInputStream = 
remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos);
-RemoteLogInputStream remoteLogInputStream = new 
RemoteLogInputStream(remoteSegInputStream);
-
-RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, 
offset);
-
+int startPos = 0;
+RecordBatch firstBatch = null;
+while (firstBatch == null && rlsMetadataOptional.isPresent()) {

Review Comment:
   @clolov  We need to use a while loop as there may be a possibility ,the next 
segment in the iteration is log compacted and we may need to further move until 
we find it. Check the example above I attached in the description PR where 
0.log and 6.log both are log compacted fully and  the next batch exist only in 
the 07.log. 
   This is the same logic used for fetching data from the log segment. 
   
https://github.com/apache/kafka/blob/317f61dfd9a7f1443cf75b6a32568d1c81984d08/core/src/main/scala/kafka/log/LocalLog.scala#L425



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2023-12-22 Thread via GitHub


iit2009060 commented on code in PR #15060:
URL: https://github.com/apache/kafka/pull/15060#discussion_r1435446137


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -1412,8 +1422,8 @@ private void 
collectAbortedTransactionInLocalSegments(long startOffset,
 }
 }
 }
-
-private Optional 
findNextSegmentMetadata(RemoteLogSegmentMetadata segmentMetadata,
+// visible for testing.
+Optional 
findNextSegmentMetadata(RemoteLogSegmentMetadata segmentMetadata,

Option leaderEpochFileCacheOption) throws 
RemoteStorageException {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2023-12-22 Thread via GitHub


iit2009060 commented on code in PR #15060:
URL: https://github.com/apache/kafka/pull/15060#discussion_r1435445309


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -1289,15 +1289,21 @@ public FetchDataInfo read(RemoteStorageFetchInfo 
remoteStorageFetchInfo) throws
 }
 
 RemoteLogSegmentMetadata remoteLogSegmentMetadata = 
rlsMetadataOptional.get();
-int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, 
offset);
 InputStream remoteSegInputStream = null;
 try {
-// Search forward for the position of the last offset that is 
greater than or equal to the target offset
-remoteSegInputStream = 
remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos);
-RemoteLogInputStream remoteLogInputStream = new 
RemoteLogInputStream(remoteSegInputStream);
-
-RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, 
offset);
-
+int startPos = 0;
+RecordBatch firstBatch = null;
+while (firstBatch == null && rlsMetadataOptional.isPresent()) {

Review Comment:
   @clolov  We need to use a while loop as there may be a possibility the next 
segment in the iteration is log compacted and we may need to further move until 
we find it. Check the example above I attached in the description PR where 
0.log and 6.log both are log compacted fully and  the next batch exist only in 
the 07.log. 
   This is the same logic used for fetching data from the log segment. 
   
https://github.com/apache/kafka/blob/317f61dfd9a7f1443cf75b6a32568d1c81984d08/core/src/main/scala/kafka/log/LocalLog.scala#L425



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2023-12-22 Thread via GitHub


iit2009060 commented on code in PR #15060:
URL: https://github.com/apache/kafka/pull/15060#discussion_r1435444785


##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -2161,6 +2167,86 @@ RecordBatch findFirstBatch(RemoteLogInputStream 
remoteLogInputStream, long offse
 }
 }
 
+@Test
+public void testReadForFirstBatchInLogCompaction() throws 
RemoteStorageException, IOException {
+FileInputStream fileInputStream = mock(FileInputStream.class);
+RemoteLogInputStream remoteLogInputStream = 
mock(RemoteLogInputStream.class);
+ClassLoaderAwareRemoteStorageManager rsmManager = 
mock(ClassLoaderAwareRemoteStorageManager.class);
+RemoteLogSegmentMetadata segmentMetadata = 
mock(RemoteLogSegmentMetadata.class);
+LeaderEpochFileCache cache = mock(LeaderEpochFileCache.class);
+when(cache.epochForOffset(anyLong())).thenReturn(OptionalInt.of(1));
+
+
when(remoteStorageManager.fetchLogSegment(any(RemoteLogSegmentMetadata.class), 
anyInt()))
+.thenAnswer(a -> fileInputStream);
+when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
+int fetchOffset = 0;
+int fetchMaxBytes = 10;
+int recordBatchSizeInBytes = fetchMaxBytes + 1;
+RecordBatch firstBatch = mock(RecordBatch.class);
+ArgumentCaptor capture = 
ArgumentCaptor.forClass(ByteBuffer.class);
+
+FetchRequest.PartitionData partitionData = new 
FetchRequest.PartitionData(
+Uuid.randomUuid(), fetchOffset, 0, fetchMaxBytes, 
Optional.empty()
+);
+
+when(rsmManager.fetchLogSegment(any(), 
anyInt())).thenReturn(fileInputStream);
+when(remoteLogMetadataManager.remoteLogSegmentMetadata(any(), 
anyInt(), anyLong())).thenReturn(Optional.of(segmentMetadata), 
Optional.of(segmentMetadata));

Review Comment:
   We are calling it two times in the Unit Test.So we need two times this value 
to be returned. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2023-12-22 Thread via GitHub


clolov commented on code in PR #15060:
URL: https://github.com/apache/kafka/pull/15060#discussion_r1435282790


##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -2161,6 +2167,86 @@ RecordBatch findFirstBatch(RemoteLogInputStream 
remoteLogInputStream, long offse
 }
 }
 
+@Test
+public void testReadForFirstBatchInLogCompaction() throws 
RemoteStorageException, IOException {
+FileInputStream fileInputStream = mock(FileInputStream.class);
+RemoteLogInputStream remoteLogInputStream = 
mock(RemoteLogInputStream.class);
+ClassLoaderAwareRemoteStorageManager rsmManager = 
mock(ClassLoaderAwareRemoteStorageManager.class);
+RemoteLogSegmentMetadata segmentMetadata = 
mock(RemoteLogSegmentMetadata.class);
+LeaderEpochFileCache cache = mock(LeaderEpochFileCache.class);
+when(cache.epochForOffset(anyLong())).thenReturn(OptionalInt.of(1));
+
+
when(remoteStorageManager.fetchLogSegment(any(RemoteLogSegmentMetadata.class), 
anyInt()))
+.thenAnswer(a -> fileInputStream);
+when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
+int fetchOffset = 0;
+int fetchMaxBytes = 10;
+int recordBatchSizeInBytes = fetchMaxBytes + 1;
+RecordBatch firstBatch = mock(RecordBatch.class);
+ArgumentCaptor capture = 
ArgumentCaptor.forClass(ByteBuffer.class);
+
+FetchRequest.PartitionData partitionData = new 
FetchRequest.PartitionData(
+Uuid.randomUuid(), fetchOffset, 0, fetchMaxBytes, 
Optional.empty()
+);
+
+when(rsmManager.fetchLogSegment(any(), 
anyInt())).thenReturn(fileInputStream);
+when(remoteLogMetadataManager.remoteLogSegmentMetadata(any(), 
anyInt(), anyLong())).thenReturn(Optional.of(segmentMetadata), 
Optional.of(segmentMetadata));

Review Comment:
   Isn't one Optional.of(segmentMetadata) enough?



##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -1412,8 +1422,8 @@ private void 
collectAbortedTransactionInLocalSegments(long startOffset,
 }
 }
 }
-
-private Optional 
findNextSegmentMetadata(RemoteLogSegmentMetadata segmentMetadata,
+// visible for testing.
+Optional 
findNextSegmentMetadata(RemoteLogSegmentMetadata segmentMetadata,

Option leaderEpochFileCacheOption) throws 
RemoteStorageException {

Review Comment:
   ```suggestion
  
Option leaderEpochFileCacheOption) throws 
RemoteStorageException {
   ```



##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -1289,15 +1289,21 @@ public FetchDataInfo read(RemoteStorageFetchInfo 
remoteStorageFetchInfo) throws
 }
 
 RemoteLogSegmentMetadata remoteLogSegmentMetadata = 
rlsMetadataOptional.get();
-int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, 
offset);
 InputStream remoteSegInputStream = null;
 try {
-// Search forward for the position of the last offset that is 
greater than or equal to the target offset
-remoteSegInputStream = 
remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos);
-RemoteLogInputStream remoteLogInputStream = new 
RemoteLogInputStream(remoteSegInputStream);
-
-RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, 
offset);
-
+int startPos = 0;
+RecordBatch firstBatch = null;
+while (firstBatch == null && rlsMetadataOptional.isPresent()) {

Review Comment:
   I am not super hung-up on this, but don't you just need to look forward once 
i.e. you have a guarantee that if you do not find the offset in this segment 
then you are bound to find it in the next, no? If this is the case can you just 
look in the next segment and not use a while loop?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2023-12-22 Thread via GitHub


clolov commented on PR #15060:
URL: https://github.com/apache/kafka/pull/15060#issuecomment-1867443497

   Thank you for the contribution! I will aim to provide a review throughout 
the day!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org