Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
satishd merged PR #15060: URL: https://github.com/apache/kafka/pull/15060 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
satishd commented on PR #15060: URL: https://github.com/apache/kafka/pull/15060#issuecomment-1891112974 Merging it to trunk as the test failures are unrelated to this change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
satishd commented on PR #15060: URL: https://github.com/apache/kafka/pull/15060#issuecomment-1890811313 @iit2009060 As we discussed earlier in the [comment](https://github.com/apache/kafka/pull/15060#issuecomment-1882487316), this issue is only applicable to compact policy and the behaviour on retention policy is as expected. As I mentioned in my earlier [comment](https://github.com/apache/kafka/pull/15060#issuecomment-1880861009), I am fine to merge this PR which partially addresses the problem of compact policy by iterating through the remote log segments and we can have a followup PR on the remaining cases. We can discuss the possible cases and solutions in [KAFKA-15388](https://issues.apache.org/jira/browse/KAFKA-15388) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
iit2009060 commented on PR #15060: URL: https://github.com/apache/kafka/pull/15060#issuecomment-1890348438 @satishd @kamalcph @divijvaidya I am able to reproduce the issue which @satishd mentioned , I need to introduced a delay to the movement of segments to remote through a hacky code. 1. Created topic test 10 2. Set segment bytes 100 so that each segment contain only one offset. 3. Set clean up policy compact 4. Produce some messages (1:1,1:2,1:3,1:4,1:5,1:6,1:7) 5. Log compaction is in progress 6. Delete log compaction policy from the topic 8. Enable remote storage enable = true I introduced a code which copied remote segments whose base offset <= 1 in the RemoteLogManager so that we can have a situation where in remote segments data is not available , but in local/active segment data is available. https://github.com/apache/kafka/assets/59436466/cc018078-801e-4850-9f1f-917ae6326202;> 9. In the local log segments 0 and 1 has been removed and moved to a remote log storage where the number of bytes is zero as data is removed because of log compaction. Observe local segments contains data only for offset >=2. **Local log segments view** https://github.com/apache/kafka/assets/59436466/6062b6f4-33ba-43dd-8495-2fb9bacff69e;> **Remote log segments view** https://github.com/apache/kafka/assets/59436466/e91e0f5b-1dc4-4569-98e1-a32eebdbaf5c;> Remote log contain only two segments 0 and 1 , both are empty because of log compaction. 10. Execute consumer service `bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test10 --offset 0 --partition 0 --property print.offset=true --property print.key=true --property print.value=true --consumer-property auto.offset.reset=earliest ` This command never proceeds as RemoteLogManager return empty records and we do not have a mechanism yet to handle cases when RemoteLog https://github.com/apache/kafka/assets/59436466/4051d50f-906e-4ee2-8475-a855f51909aa;> Manager return empty records. https://github.com/apache/kafka/assets/59436466/424015c0-c4d3-4b03-ab3c-2940c02a9ad9;> https://github.com/apache/kafka/assets/59436466/334d807a-3f4c-4b00-8990-fde48912b14e;> @satishd @divijvaidya @kamalcph Let me know if you have a suggestion to fix this use case . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
iit2009060 commented on PR #15060: URL: https://github.com/apache/kafka/pull/15060#issuecomment-1888750316 > https://issues.apache.org/jira/browse/KAFKA-16088 Thanks @kamalcph for the discussion. @satishd I discussed with @kamalcph on the above cases 1. If auto offset reset is **latest**, we can still lose the messages on the case mentioned above. That is the expected behaviour with the "**latest**" configuration. 2. But if auto offset is earliest , The issue never occurs. @satishd We can merge the current PR if it is feasible. I will continue to work and see if i can reproduce the case mentioned by you around log compaction. And will open a separate PR once the issue is reproducible. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
iit2009060 commented on PR #15060: URL: https://github.com/apache/kafka/pull/15060#issuecomment-1882690345 > @iit2009060 In step 7, it is anticipated that setting the fetch offset to 1 should follow the specified process to trigger an `OutOfOffsetRangeError` with empty records as I mentioned [earlier](https://github.com/apache/kafka/pull/15060#issuecomment-1882487316). > > > I will try with from-beginning option and update the behaviour. > > Sure. > > > Even for the latest configuration it should work? > > `latest` option also should work. To validate this, run the step with the `latest` setting and concurrently run `kafka-console-producer`, generating additional messages beyond the latest offset where the `OutOfOffsetRangeError` was encountered. Subsequently, the consumer should be able to consume the newly produced sequence of messages. @satishd I will try to write a integration or unit test to depicts the scenario I mentioned. However as far I noticed ,the additional messages I reproduced in step 5 were not able to retrieve with fetch offset 1 request and were able to proceed with offset 6. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
satishd commented on PR #15060: URL: https://github.com/apache/kafka/pull/15060#issuecomment-1882660480 @iit2009060 In step 7, it is anticipated that setting the fetch offset to 1 should follow the specified process to trigger an `OutOfOffsetRangeError` with empty records as I mentioned [earlier](https://github.com/apache/kafka/pull/15060#issuecomment-1882487316). >I will try with from-beginning option and update the behaviour. Sure. >Even for the latest configuration it should work? `latest` option also should work. To validate this, run the step with the `latest` setting and concurrently run `kafka-console-producer`, generating additional messages beyond the latest offset where the `OutOfOffsetRangeError` was encountered. Subsequently, the consumer should be able to consume the newly produced sequence of messages. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
iit2009060 commented on PR #15060: URL: https://github.com/apache/kafka/pull/15060#issuecomment-1882517442 > auto-offset-reset > @iit2009060 The behaviour that you mentioned in the [comment](https://github.com/apache/kafka/pull/15060#issuecomment-1879657273) seems to be working as expected. The client seems to be receiving OffsetOutOfRangeException. When a consumer client sends a request for fetch offset that is out of range then it receives offset out of range error and it sends the next request based on `auto-offset-reset` strategy that is set on the consumer client. It seems the default value for `kafka-console-consumer` is `latest` if not set through an `auto.offset.reset` property or does not use `--from-beginning` argument. In your case, it may be trying to fetch the latest offset in the next request and there are no messages published to the topic beyond that offset as there may not be any more messages published after that offset. You can try running the same scenario of fetching out of range offset 1 with `--from-beginning` and observe the behavior. > > We can syncup offline to understand your scenario better. @satishd Check the **5th step** mentioned in the [comment](https://github.com/apache/kafka/pull/15060#issuecomment-1879657273) where I produced some data. In the **7th step** I made an offset request with 1 and 6 both ,6th worked fine but the request with offset 1 goes through the step 1. handle OutOfOffSetRangeError 2. It goes through the if condition mentioned in the **step 8** 3. Then it goes through the else part `} else { createLogReadResult(exception) }` 4. As mentioned in **step 9**screenshot It creates empty record response with exception wrapped in the LogReadResult. Default configuration is **latest** in the console consumer. Even for the **latest** configuration it should work? I will try with from-beginning option and update the behaviour. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
satishd commented on PR #15060: URL: https://github.com/apache/kafka/pull/15060#issuecomment-1882487316 @iit2009060 The behaviour that you mentioned in the [comment](https://github.com/apache/kafka/pull/15060#issuecomment-1879657273) seems to be working as expected. The client seems to be receiving OffsetOutOfRangeException. When a consumer client sends a request for fetch offset that is out of range then it receives offset out of range error and it sends the next request based on `auto-offset-reset` strategy that is set on the consumer client. It seems the default value for `kafka-console-consumer` is `latest` if not set through an `auto.offset.reset` property or does not use `--from-beginning` argument. In your case, it may be trying to fetch the latest offset in the next request and there are no messages published to the topic beyond that offset as there may not be any more messages published after that offset. You can try running the same scenario of fetching out of range offset 1 with `--from-beginning` and observe the behavior. We can syncup offline to understand your scenario better. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
iit2009060 commented on PR #15060: URL: https://github.com/apache/kafka/pull/15060#issuecomment-1880890006 > @iit2009060 [KAFKA-15388](https://issues.apache.org/jira/browse/KAFKA-15388) is about addressing the fetch requests for tiered storage enabled topics that have compacted offsets in a partition before their cleanup policy is changed to delete. This PR only covers if the next offset is available only within the remote log segments. But there is a case when the offset is available in the subsequent local segments(not only in active local segments but in any of the local only segments) that have not yet been copied to remote storage. The solution does not address [KAFKA-15388](https://issues.apache.org/jira/browse/KAFKA-15388) completely. > > I am fine to merge this and have a followup PR for the mentioned scenario to resolve [KAFKA-15388](https://issues.apache.org/jira/browse/KAFKA-15388) addressing the compacted topics before tiered storage is enabled. @satishd Yes, as you mentioned the current PR is not covering reading from local log segments ,i.e. data not available in RemoteLogSegments and the reason can be 1. Log compaction 2. Retention As the above specific scenario touch point multiple feature and not generally a specific issue because of log compaction. The intention behind creating a separate [ticket](https://issues.apache.org/jira/browse/KAFKA-16088) is to provide a dedicated space for addressing the broader issue, encompassing scenarios beyond the scope of the current PR. However I am ok if we want to have a follow up PR to merge this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
satishd commented on PR #15060: URL: https://github.com/apache/kafka/pull/15060#issuecomment-1880861009 @iit2009060 KAFKA-15388 is about addressing the fetch requests for tiered storage enabled topics that have compacted offsets in a partition before their cleanup policy is changed to delete. This PR only covers if the next offset is available only within the remote log segments. But there is a case when the offset is available in the subsequent local segments(not only in active local segments but in any of the local only segments) that have not yet been copied to remote storage. The solution does not address KAFKA-15388 completely. I am fine to merge this and have a followup PR for the mentioned scenario to resolve KAFKA-15388 addressing the compacted topics before tiered storage is enabled. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
iit2009060 commented on PR #15060: URL: https://github.com/apache/kafka/pull/15060#issuecomment-1880775216 > > @satishd @divijvaidya @kamalcph I am able to reproduce the above scenario using retention feature instead of log compaction. > > The overall problem is we are sending MemoryRecords.Empty when unable to find offset even though active segments can still have the data. > > Consider the below scenario > > > > 1. create topic test8 with partition 0 and with remote storage enabled. > > 2. Current status of topic (Offset 0,2,3,4) > > > > https://private-user-images.githubusercontent.com/59436466/294679830-ab83f9e3-a44b-4c22-8d72-20a07fc6ce6d.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MDQ2MTI0MjksIm5iZiI6MTcwNDYxMjEyOSwicGF0aCI6Ii81OTQzNjQ2Ni8yOTQ2Nzk4MzAtYWI4M2Y5ZTMtYTQ0Yi00YzIyLThkNzItMjBhMDdmYzZjZTZkLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDAxMDclMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwMTA3VDA3MjIwOVomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTcwNzlkNzQ1NTRmYzFmZjU5OTNjYmMwZDM5OTU5YmZjNjBhZjczMzdjNjg4ZDQxZWI2NzdiM2U5ZTlhNzQwMjcmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.M4Y1cvGSDNozMQTsAWwwq9K8ZHkCnGUZLqy6fKBk5GY;> 3. When we make a fetch request with offset 1 https://private-user-images.githubusercontent.com/59436466/294679851-9ef58907-a545-403b-a004-20e094945d30.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MDQ2MTI0MjksIm5iZiI6MTcwNDYxMjEyOSwicGF0aCI6Ii81OTQzNjQ2Ni8yOTQ2Nzk4NTEtOWVmNTg5MDctYTU0NS00MDNiLWEwMDQtMjBlMDk0OTQ1ZDMwLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDAxMDclMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwMTA3VDA3MjIwOVomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTNmNzY2OTUzMTUwMGE3NzNhODE5ZmJlYWIxYWRjNDBhNjRjMTJlNWUzNTllMTQ5MDFhMmZmMDdiM2E0MDAxNTcmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.HeHVo2nfLE0HLDe4QkEcrFzx1_hUGrwgnaifTzJPo_I;> 4. To delete remote segments I have set up the configuration retention.ms=1000 https://private-user-images. githubusercontent.com/59436466/294679912-ba5b34cd-b9f1-4bb3-9574-8ebe9cd37741.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MDQ2MTI0MjksIm5iZiI6MTcwNDYxMjEyOSwicGF0aCI6Ii81OTQzNjQ2Ni8yOTQ2Nzk5MTItYmE1YjM0Y2QtYjlmMS00YmIzLTk1NzQtOGViZTljZDM3NzQxLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDAxMDclMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwMTA3VDA3MjIwOVomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTUxZjRmODg4MjRiNjE3Njc5M2E1Y2IyYzViZWVlMzdhNTU2Mzk4MzMzMmI0ZjcxMGZiZDUyYjZjYmRhY2EyZjMmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.loKBLUJmgPtGvrnAFspaabHWZDzMPl_c1M8K4mqdRZQ"> 5. Now once the data is deleted, reset the local and remote retention to 1 hour. Produce some data https://private-user-images.githubusercon tent.com/59436466/294680022-ee91b9a7-8fbe-4586-8ca8-3164fe82bddd.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MDQ2MTI0MjksIm5iZiI6MTcwNDYxMjEyOSwicGF0aCI6Ii81OTQzNjQ2Ni8yOTQ2ODAwMjItZWU5MWI5YTctOGZiZS00NTg2LThjYTgtMzE2NGZlODJiZGRkLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDAxMDclMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwMTA3VDA3MjIwOVomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTA0NDI1MjRiM2YwNWRkNGJiYjE2NWVmNzhiOTBhNjEzYmU5YTFjNTJmYTQ4N2Q1NjgyYjQwMTE1ZmY4MDYwYTMmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.OWvD41CLgnMU69aL_4z5Lak-KlZKCnxRl7apaDQx-DA"> 7. When I am trying to fetch offset 1 from the topic test 8 partition 0 , it never able to respond. Ideally it should pick the data from the active segment whose offset starts at 6.
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
kamalcph commented on PR #15060: URL: https://github.com/apache/kafka/pull/15060#issuecomment-1879987059 > In general this is happening because The next fetch request offset increments only when there is a record list. But the Remote fetch implementation by default if not able to find the records sents Empty Records. Empty records response is sent back to the consumer so that it can retry the request if needed. In a non-compacted topic, the records are expected to be present in the contiguous manner. Shall we list the scenarios where the record offsets won't be in contiguous manner for regular (non-compacted) topic (or) write a unit test to reproduce this problem? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
iit2009060 commented on PR #15060: URL: https://github.com/apache/kafka/pull/15060#issuecomment-1879980873 > @satishd @divijvaidya @kamalcph I am able to reproduce the above scenario using retention feature instead of log compaction. > > The overall problem is we are sending MemoryRecords.Empty when unable to find offset even though active segments can still have the data. > > Consider the below scenario > > 1. create topic test8 with partition 0 and with remote storage enabled. > 2. Current status of topic (Offset 0,2,3,4) > > https://private-user-images.githubusercontent.com/59436466/294679830-ab83f9e3-a44b-4c22-8d72-20a07fc6ce6d.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MDQ2MTI0MjksIm5iZiI6MTcwNDYxMjEyOSwicGF0aCI6Ii81OTQzNjQ2Ni8yOTQ2Nzk4MzAtYWI4M2Y5ZTMtYTQ0Yi00YzIyLThkNzItMjBhMDdmYzZjZTZkLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDAxMDclMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwMTA3VDA3MjIwOVomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTcwNzlkNzQ1NTRmYzFmZjU5OTNjYmMwZDM5OTU5YmZjNjBhZjczMzdjNjg4ZDQxZWI2NzdiM2U5ZTlhNzQwMjcmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.M4Y1cvGSDNozMQTsAWwwq9K8ZHkCnGUZLqy6fKBk5GY;> 3. When we make a fetch request with offset 1 https://private-user-images.githubusercontent.com/59436466/294679851-9ef58907-a545-403b-a004-20e094945d30.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MDQ2MTI0MjksIm5iZiI6MTcwNDYxMjEyOSwicGF0aCI6Ii81OTQzNjQ2Ni8yOTQ2Nzk4NTEtOWVmNTg5MDctYTU0NS00MDNiLWEwMDQtMjBlMDk0OTQ1ZDMwLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDAxMDclMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwMTA3VDA3MjIwOVomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTNmNzY2OTUzMTUwMGE3NzNhODE5ZmJlYWIxYWRjNDBhNjRjMTJlNWUzNTllMTQ5MDFhMmZmMDdiM2E0MDAxNTcmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.HeHVo2nfLE0HLDe4QkEcrFzx1_hUGrwgnaifTzJPo_I;> 4. To delete remote segments I have set up the configuration retention.ms=1000 https://private-user-images.gi thubusercontent.com/59436466/294679912-ba5b34cd-b9f1-4bb3-9574-8ebe9cd37741.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MDQ2MTI0MjksIm5iZiI6MTcwNDYxMjEyOSwicGF0aCI6Ii81OTQzNjQ2Ni8yOTQ2Nzk5MTItYmE1YjM0Y2QtYjlmMS00YmIzLTk1NzQtOGViZTljZDM3NzQxLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDAxMDclMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwMTA3VDA3MjIwOVomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTUxZjRmODg4MjRiNjE3Njc5M2E1Y2IyYzViZWVlMzdhNTU2Mzk4MzMzMmI0ZjcxMGZiZDUyYjZjYmRhY2EyZjMmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.loKBLUJmgPtGvrnAFspaabHWZDzMPl_c1M8K4mqdRZQ"> 5. Now once the data is deleted, reset the local and remote retention to 1 hour. Produce some data https://private-user-images.githubuserconte nt.com/59436466/294680022-ee91b9a7-8fbe-4586-8ca8-3164fe82bddd.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MDQ2MTI0MjksIm5iZiI6MTcwNDYxMjEyOSwicGF0aCI6Ii81OTQzNjQ2Ni8yOTQ2ODAwMjItZWU5MWI5YTctOGZiZS00NTg2LThjYTgtMzE2NGZlODJiZGRkLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDAxMDclMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwMTA3VDA3MjIwOVomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTA0NDI1MjRiM2YwNWRkNGJiYjE2NWVmNzhiOTBhNjEzYmU5YTFjNTJmYTQ4N2Q1NjgyYjQwMTE1ZmY4MDYwYTMmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.OWvD41CLgnMU69aL_4z5Lak-KlZKCnxRl7apaDQx-DA"> 7. When I am trying to fetch offset 1 from the topic test 8 partition 0 , it never able to respond. Ideally it should pick the data from the active segment whose offset starts at 6.
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
iit2009060 commented on PR #15060: URL: https://github.com/apache/kafka/pull/15060#issuecomment-1879657273 @satishd @divijvaidya @kamalcph I am able to reproduce the above scenario using retention feature instead of log compaction. The overall problem is we are sending MemoryRecords.Empty when unable to find offset even though active segments can still have the data. Consider the below scenario 1. create topic test8 with partition 0 and with remote storage enabled. 2. Current status of topic (Offset 0,2,3,4) https://github.com/apache/kafka/assets/59436466/ab83f9e3-a44b-4c22-8d72-20a07fc6ce6d;> 3. When we make a fetch request with offset 1 https://github.com/apache/kafka/assets/59436466/9ef58907-a545-403b-a004-20e094945d30;> 4. To delete remote segments I have set up the configuration retention.ms=1000 https://github.com/apache/kafka/assets/59436466/ba5b34cd-b9f1-4bb3-9574-8ebe9cd37741;> 5. Now once the data is deleted reset the local and remote retention to 1 hour. 6. Produce some data https://github.com/apache/kafka/assets/59436466/ee91b9a7-8fbe-4586-8ca8-3164fe82bddd;> 7. When I am trying to fetch offset 1 from the topic test 8 partition 0 , it never able to respond. Ideally it should pick the data from the active segment whose offset starts at 6. https://github.com/apache/kafka/assets/59436466/15a560ca-9396-4a73-9c16-8812cda36adf;> 8. It happened because requestedoffset 1 is not greater than log.logStartOffset which is 6 https://github.com/apache/kafka/assets/59436466/067ed7d9-5bc7-4d56-b0c9-799dabdad9fa;> 9. We create Empty Record Response. https://github.com/apache/kafka/assets/59436466/f93a53e7-f7ff-4885-849b-f95fedd830e9;> In general this is happening because The next fetch request offset increments only when there is a record list. But the Remote fetch implementation by default if not able to find the records sents Empty Records. We should create separate JIRA to track this .The issue is not specific to log compaction but it will happen whenever we sent Records Empty from remote storage. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
iit2009060 commented on PR #15060: URL: https://github.com/apache/kafka/pull/15060#issuecomment-1877542832 > @iit2009060 You may have mistyped as controller but it does not have any role in the fetch path here. Remote fetch is executed through a specific purgatory with DelayedRemoteFetch and read operation/callback is executed in a specific thread pool. It returns empty records when there is no data for a specific offset in the remote storage. Yes @satishd correct . I am still going through the code but need few inputs 1. If we send records empty , How it determine the next offset to be fetched. Because in LogOffSetMetadata we always return the fetchOffset request. Or it keep requesting the fetchOffSet request ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
iit2009060 commented on PR #15060: URL: https://github.com/apache/kafka/pull/15060#issuecomment-1877528171 > Thanks @iit2009060 for the PR. > > Let us say there are two segments in remote storage and subsequents segments in local storage. remote-seg-10[10, 20], remote-seg-21[21, 30] : offsets 25 to 30 are compacted. local-seg-31[31, 40] > > When a fetch request comes for offsets with in [25, 30] then it should move to the local segment as those offsets might have been compacted earlier. Did you also cover this scenario in this PR? @satishd I am trying to reproduce the case when last offsets of the segment earlier than the active segment is compacted away. I tried locally but not able to reproduce the above scenario ? There always exist a last offset in the segment earlier than the active segment. As per the article https://towardsdatascience.com/log-compacted-topics-in-apache-kafka-b1aa1e4665a7 All records in the active segment are never compacted. @satishd Do you have a step in mind to regenerate the above scenario ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
satishd commented on PR #15060: URL: https://github.com/apache/kafka/pull/15060#issuecomment-1877333095 @iit2009060 You may have mistyped as controller but it does not have any role in the fetch path here. Remote fetch is executed through a specific purgatory with DelayedRemoteFetch and read operation/callback is executed in a specific thread pool. It returns empty records when there is no data for a specific offset in the remote storage. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
iit2009060 commented on PR #15060: URL: https://github.com/apache/kafka/pull/15060#issuecomment-1876338999 > Thanks @iit2009060 for the PR. > > Let us say there are two segments in remote storage and subsequents segments in local storage. remote-seg-10[10, 20], remote-seg-21[21, 30] : offsets 25 to 30 are compacted. local-seg-31[31, 40] > > When a fetch request comes for offsets with in [25, 30] then it should move to the local segment as those offsets might have been compacted earlier. Did you also cover this scenario in this PR? @satishd I have not tested this case explicitly. In this case RemoteLogManager would be returning firstBatch as null and the controller(The class which is invoking RemoteLogManager read) should take care of reading the next segment locally. Let me reproduce this issue locally and update the behaviour. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
iit2009060 commented on code in PR #15060: URL: https://github.com/apache/kafka/pull/15060#discussion_r1440153281 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -1340,6 +1346,10 @@ public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws Utils.closeQuietly(remoteSegInputStream, "RemoteLogSegmentInputStream"); } } +// for testing +public RemoteLogInputStream getRemoteLogInputStream(InputStream in) { Review Comment: @kamalcph Yes sure. Updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
kamalcph commented on code in PR #15060: URL: https://github.com/apache/kafka/pull/15060#discussion_r1440146317 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -1340,6 +1346,10 @@ public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws Utils.closeQuietly(remoteSegInputStream, "RemoteLogSegmentInputStream"); } } +// for testing +public RemoteLogInputStream getRemoteLogInputStream(InputStream in) { Review Comment: @iit2009060 can we remove the `public` method access specifier? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
iit2009060 commented on PR #15060: URL: https://github.com/apache/kafka/pull/15060#issuecomment-1874937679 > LGTM, thanks for the patch! > > This patch handles the FETCH request for previously compacted topic segments uploaded to remote storage. We also have to go through the > > 1. `upload`, `deletion` path and > 2. RemoteLogMetadataCache internal state > > for both normal and unclean-leader-election scenarios. https://issues.apache.org/jira/browse/KAFKA-15388 As mentioned in the description for upload and delete it will not be impacted. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
iit2009060 commented on code in PR #15060: URL: https://github.com/apache/kafka/pull/15060#discussion_r1440133601 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -1289,15 +1289,21 @@ public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws } RemoteLogSegmentMetadata remoteLogSegmentMetadata = rlsMetadataOptional.get(); -int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, offset); InputStream remoteSegInputStream = null; try { -// Search forward for the position of the last offset that is greater than or equal to the target offset -remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos); -RemoteLogInputStream remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream); - -RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, offset); - +int startPos = 0; +RecordBatch firstBatch = null; +while (firstBatch == null && rlsMetadataOptional.isPresent()) { Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
iit2009060 commented on code in PR #15060: URL: https://github.com/apache/kafka/pull/15060#discussion_r1440126550 ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -2065,6 +2066,11 @@ public Optional fetchRemoteLogSegmentMetadata(TopicPar return Optional.of(segmentMetadata); } +public Optional findNextSegmentMetadata(RemoteLogSegmentMetadata segmentMetadata, + Option leaderEpochFileCacheOption) { +return Optional.ofNullable(null); Review Comment: done. ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -2161,6 +2167,85 @@ RecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long offse } } +@Test +public void testReadForFirstBatchInLogCompaction() throws RemoteStorageException, IOException { +FileInputStream fileInputStream = mock(FileInputStream.class); +RemoteLogInputStream remoteLogInputStream = mock(RemoteLogInputStream.class); +ClassLoaderAwareRemoteStorageManager rsmManager = mock(ClassLoaderAwareRemoteStorageManager.class); +RemoteLogSegmentMetadata segmentMetadata = mock(RemoteLogSegmentMetadata.class); +LeaderEpochFileCache cache = mock(LeaderEpochFileCache.class); +when(cache.epochForOffset(anyLong())).thenReturn(OptionalInt.of(1)); + + when(remoteStorageManager.fetchLogSegment(any(RemoteLogSegmentMetadata.class), anyInt())) Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
iit2009060 commented on code in PR #15060: URL: https://github.com/apache/kafka/pull/15060#discussion_r1440116425 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -1340,6 +1346,10 @@ public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws Utils.closeQuietly(remoteSegInputStream, "RemoteLogSegmentInputStream"); } } +// for testing +public RemoteLogInputStream getRemoteLogInputStream(InputStream in) { Review Comment: @kamalcph Do we really see a need for passing actual ByteArrayInputStream, as it will make it difficult to mock the behaviour. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
iit2009060 commented on code in PR #15060: URL: https://github.com/apache/kafka/pull/15060#discussion_r1440115636 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -1340,6 +1346,10 @@ public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws Utils.closeQuietly(remoteSegInputStream, "RemoteLogSegmentInputStream"); } } +// for testing +public RemoteLogInputStream getRemoteLogInputStream(InputStream in) { Review Comment: @kamalcph I want to mock the RemoteLoginputStream so that i can invoke and mockthe nextBatch function accordingly. In the first iteration I want the firstBatch to be null and in the next iteration firstBatch to be available. ` when(remoteLogInputStream.nextBatch()).thenReturn(null, firstBatch);` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
iit2009060 commented on code in PR #15060: URL: https://github.com/apache/kafka/pull/15060#discussion_r1440115636 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -1340,6 +1346,10 @@ public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws Utils.closeQuietly(remoteSegInputStream, "RemoteLogSegmentInputStream"); } } +// for testing +public RemoteLogInputStream getRemoteLogInputStream(InputStream in) { Review Comment: @kamalcph I want to mock the RemoteLoginputStream so that i can invoke the nextBatch function. In the first iteration I want the firstBatch to be null and in the next iteration firstBatch to be available. ` when(remoteLogInputStream.nextBatch()).thenReturn(null, firstBatch);` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
kamalcph commented on code in PR #15060: URL: https://github.com/apache/kafka/pull/15060#discussion_r1440097317 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -1289,15 +1289,21 @@ public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws } RemoteLogSegmentMetadata remoteLogSegmentMetadata = rlsMetadataOptional.get(); -int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, offset); InputStream remoteSegInputStream = null; try { -// Search forward for the position of the last offset that is greater than or equal to the target offset -remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos); -RemoteLogInputStream remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream); - -RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, offset); - +int startPos = 0; +RecordBatch firstBatch = null; +while (firstBatch == null && rlsMetadataOptional.isPresent()) { Review Comment: Add one comment above the `while` loop to mention the case that we handled for posterity. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
kamalcph commented on code in PR #15060: URL: https://github.com/apache/kafka/pull/15060#discussion_r1440089289 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -1340,6 +1346,10 @@ public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws Utils.closeQuietly(remoteSegInputStream, "RemoteLogSegmentInputStream"); } } +// for testing +public RemoteLogInputStream getRemoteLogInputStream(InputStream in) { Review Comment: can we reduce the method access specifier to package-private? I would suggest to remove this method and return the actual stream instead of mockStream for `remoteStorageManager.fetchLogSegment`: ```java new ByteArrayInputStream(records(ts, startOffset, targetLeaderEpoch).buffer().array()) ``` ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -2161,6 +2167,85 @@ RecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long offse } } +@Test +public void testReadForFirstBatchInLogCompaction() throws RemoteStorageException, IOException { +FileInputStream fileInputStream = mock(FileInputStream.class); +RemoteLogInputStream remoteLogInputStream = mock(RemoteLogInputStream.class); +ClassLoaderAwareRemoteStorageManager rsmManager = mock(ClassLoaderAwareRemoteStorageManager.class); +RemoteLogSegmentMetadata segmentMetadata = mock(RemoteLogSegmentMetadata.class); +LeaderEpochFileCache cache = mock(LeaderEpochFileCache.class); +when(cache.epochForOffset(anyLong())).thenReturn(OptionalInt.of(1)); + + when(remoteStorageManager.fetchLogSegment(any(RemoteLogSegmentMetadata.class), anyInt())) Review Comment: There are two remote-storage-manager: `rsmManager` (local) and `remoteStorageManager` (global). Can we discard the latter one? ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -2065,6 +2066,11 @@ public Optional fetchRemoteLogSegmentMetadata(TopicPar return Optional.of(segmentMetadata); } +public Optional findNextSegmentMetadata(RemoteLogSegmentMetadata segmentMetadata, + Option leaderEpochFileCacheOption) { +return Optional.ofNullable(null); Review Comment: Can we use `Optional.empty()` instead? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
iit2009060 commented on PR #15060: URL: https://github.com/apache/kafka/pull/15060#issuecomment-1874100254 @clolov Can we merge the request ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
clolov commented on code in PR #15060: URL: https://github.com/apache/kafka/pull/15060#discussion_r1439460528 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -1289,15 +1289,21 @@ public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws } RemoteLogSegmentMetadata remoteLogSegmentMetadata = rlsMetadataOptional.get(); -int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, offset); InputStream remoteSegInputStream = null; try { -// Search forward for the position of the last offset that is greater than or equal to the target offset -remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos); -RemoteLogInputStream remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream); - -RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, offset); - +int startPos = 0; +RecordBatch firstBatch = null; +while (firstBatch == null && rlsMetadataOptional.isPresent()) { Review Comment: Apologies for the delay, okay, this reasoning makes sense to me, but even if I am wrong the while loop won't really cause a performance impact, so I am happy with it! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
iit2009060 commented on PR #15060: URL: https://github.com/apache/kafka/pull/15060#issuecomment-1870969987 @clolov @divijvaidya Do you have any additional review comments ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
iit2009060 commented on code in PR #15060: URL: https://github.com/apache/kafka/pull/15060#discussion_r1435854787 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -1289,15 +1289,21 @@ public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws } RemoteLogSegmentMetadata remoteLogSegmentMetadata = rlsMetadataOptional.get(); -int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, offset); InputStream remoteSegInputStream = null; try { -// Search forward for the position of the last offset that is greater than or equal to the target offset -remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos); -RemoteLogInputStream remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream); - -RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, offset); - +int startPos = 0; +RecordBatch firstBatch = null; +while (firstBatch == null && rlsMetadataOptional.isPresent()) { Review Comment: @clolov I believe this merging of segments happens in the background , but before the merging start we disable the compaction on the topic to make it ready for tiered storage because of which merging of segments never happens. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
clolov commented on code in PR #15060: URL: https://github.com/apache/kafka/pull/15060#discussion_r1435781798 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -1289,15 +1289,21 @@ public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws } RemoteLogSegmentMetadata remoteLogSegmentMetadata = rlsMetadataOptional.get(); -int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, offset); InputStream remoteSegInputStream = null; try { -// Search forward for the position of the last offset that is greater than or equal to the target offset -remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos); -RemoteLogInputStream remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream); - -RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, offset); - +int startPos = 0; +RecordBatch firstBatch = null; +while (firstBatch == null && rlsMetadataOptional.isPresent()) { Review Comment: Huh, interesting, I shall circle back to this, maybe I am not as familiar with compaction as I thought I was. To be honest, I was expecting that the segments 0 and 6 will be merged into one by compaction - I was under the impression that it doesn't leave empty segments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
iit2009060 commented on code in PR #15060: URL: https://github.com/apache/kafka/pull/15060#discussion_r1435445309 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -1289,15 +1289,21 @@ public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws } RemoteLogSegmentMetadata remoteLogSegmentMetadata = rlsMetadataOptional.get(); -int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, offset); InputStream remoteSegInputStream = null; try { -// Search forward for the position of the last offset that is greater than or equal to the target offset -remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos); -RemoteLogInputStream remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream); - -RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, offset); - +int startPos = 0; +RecordBatch firstBatch = null; +while (firstBatch == null && rlsMetadataOptional.isPresent()) { Review Comment: @clolov We need to use a while loop as there may be a possibility ,the next segment in the iteration is also log compacted and we may need to further move until we find it. Check the example above I attached in the description PR where 0.log and 6.log both are log compacted fully and the next batch exist only in the 07.log. The similar logic is used in fetching data from the log segment. https://github.com/apache/kafka/blob/317f61dfd9a7f1443cf75b6a32568d1c81984d08/core/src/main/scala/kafka/log/LocalLog.scala#L425 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
iit2009060 commented on code in PR #15060: URL: https://github.com/apache/kafka/pull/15060#discussion_r1435445309 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -1289,15 +1289,21 @@ public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws } RemoteLogSegmentMetadata remoteLogSegmentMetadata = rlsMetadataOptional.get(); -int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, offset); InputStream remoteSegInputStream = null; try { -// Search forward for the position of the last offset that is greater than or equal to the target offset -remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos); -RemoteLogInputStream remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream); - -RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, offset); - +int startPos = 0; +RecordBatch firstBatch = null; +while (firstBatch == null && rlsMetadataOptional.isPresent()) { Review Comment: @clolov We need to use a while loop as there may be a possibility ,the next segment in the iteration is also log compacted and we may need to further move until we find it. Check the example above I attached in the description PR where 0.log and 6.log both are log compacted fully and the next batch exist only in the 07.log. This is the same logic used for fetching data from the log segment. https://github.com/apache/kafka/blob/317f61dfd9a7f1443cf75b6a32568d1c81984d08/core/src/main/scala/kafka/log/LocalLog.scala#L425 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
iit2009060 commented on code in PR #15060: URL: https://github.com/apache/kafka/pull/15060#discussion_r1435444785 ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -2161,6 +2167,86 @@ RecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long offse } } +@Test +public void testReadForFirstBatchInLogCompaction() throws RemoteStorageException, IOException { +FileInputStream fileInputStream = mock(FileInputStream.class); +RemoteLogInputStream remoteLogInputStream = mock(RemoteLogInputStream.class); +ClassLoaderAwareRemoteStorageManager rsmManager = mock(ClassLoaderAwareRemoteStorageManager.class); +RemoteLogSegmentMetadata segmentMetadata = mock(RemoteLogSegmentMetadata.class); +LeaderEpochFileCache cache = mock(LeaderEpochFileCache.class); +when(cache.epochForOffset(anyLong())).thenReturn(OptionalInt.of(1)); + + when(remoteStorageManager.fetchLogSegment(any(RemoteLogSegmentMetadata.class), anyInt())) +.thenAnswer(a -> fileInputStream); +when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + +int fetchOffset = 0; +int fetchMaxBytes = 10; +int recordBatchSizeInBytes = fetchMaxBytes + 1; +RecordBatch firstBatch = mock(RecordBatch.class); +ArgumentCaptor capture = ArgumentCaptor.forClass(ByteBuffer.class); + +FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData( +Uuid.randomUuid(), fetchOffset, 0, fetchMaxBytes, Optional.empty() +); + +when(rsmManager.fetchLogSegment(any(), anyInt())).thenReturn(fileInputStream); +when(remoteLogMetadataManager.remoteLogSegmentMetadata(any(), anyInt(), anyLong())).thenReturn(Optional.of(segmentMetadata), Optional.of(segmentMetadata)); Review Comment: Thanks for pointing out. This may be even not required as I already mocked fetchRemoteLogSegmentMetadata function. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
iit2009060 commented on code in PR #15060: URL: https://github.com/apache/kafka/pull/15060#discussion_r1435445309 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -1289,15 +1289,21 @@ public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws } RemoteLogSegmentMetadata remoteLogSegmentMetadata = rlsMetadataOptional.get(); -int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, offset); InputStream remoteSegInputStream = null; try { -// Search forward for the position of the last offset that is greater than or equal to the target offset -remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos); -RemoteLogInputStream remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream); - -RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, offset); - +int startPos = 0; +RecordBatch firstBatch = null; +while (firstBatch == null && rlsMetadataOptional.isPresent()) { Review Comment: @clolov We need to use a while loop as there may be a possibility ,the next segment in the iteration is log compacted and we may need to further move until we find it. Check the example above I attached in the description PR where 0.log and 6.log both are log compacted fully and the next batch exist only in the 07.log. This is the same logic used for fetching data from the log segment. https://github.com/apache/kafka/blob/317f61dfd9a7f1443cf75b6a32568d1c81984d08/core/src/main/scala/kafka/log/LocalLog.scala#L425 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
iit2009060 commented on code in PR #15060: URL: https://github.com/apache/kafka/pull/15060#discussion_r1435446137 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -1412,8 +1422,8 @@ private void collectAbortedTransactionInLocalSegments(long startOffset, } } } - -private Optional findNextSegmentMetadata(RemoteLogSegmentMetadata segmentMetadata, +// visible for testing. +Optional findNextSegmentMetadata(RemoteLogSegmentMetadata segmentMetadata, Option leaderEpochFileCacheOption) throws RemoteStorageException { Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
iit2009060 commented on code in PR #15060: URL: https://github.com/apache/kafka/pull/15060#discussion_r1435445309 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -1289,15 +1289,21 @@ public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws } RemoteLogSegmentMetadata remoteLogSegmentMetadata = rlsMetadataOptional.get(); -int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, offset); InputStream remoteSegInputStream = null; try { -// Search forward for the position of the last offset that is greater than or equal to the target offset -remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos); -RemoteLogInputStream remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream); - -RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, offset); - +int startPos = 0; +RecordBatch firstBatch = null; +while (firstBatch == null && rlsMetadataOptional.isPresent()) { Review Comment: @clolov We need to use a while loop as there may be a possibility the next segment in the iteration is log compacted and we may need to further move until we find it. Check the example above I attached in the description PR where 0.log and 6.log both are log compacted fully and the next batch exist only in the 07.log. This is the same logic used for fetching data from the log segment. https://github.com/apache/kafka/blob/317f61dfd9a7f1443cf75b6a32568d1c81984d08/core/src/main/scala/kafka/log/LocalLog.scala#L425 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
iit2009060 commented on code in PR #15060: URL: https://github.com/apache/kafka/pull/15060#discussion_r1435444785 ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -2161,6 +2167,86 @@ RecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long offse } } +@Test +public void testReadForFirstBatchInLogCompaction() throws RemoteStorageException, IOException { +FileInputStream fileInputStream = mock(FileInputStream.class); +RemoteLogInputStream remoteLogInputStream = mock(RemoteLogInputStream.class); +ClassLoaderAwareRemoteStorageManager rsmManager = mock(ClassLoaderAwareRemoteStorageManager.class); +RemoteLogSegmentMetadata segmentMetadata = mock(RemoteLogSegmentMetadata.class); +LeaderEpochFileCache cache = mock(LeaderEpochFileCache.class); +when(cache.epochForOffset(anyLong())).thenReturn(OptionalInt.of(1)); + + when(remoteStorageManager.fetchLogSegment(any(RemoteLogSegmentMetadata.class), anyInt())) +.thenAnswer(a -> fileInputStream); +when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + +int fetchOffset = 0; +int fetchMaxBytes = 10; +int recordBatchSizeInBytes = fetchMaxBytes + 1; +RecordBatch firstBatch = mock(RecordBatch.class); +ArgumentCaptor capture = ArgumentCaptor.forClass(ByteBuffer.class); + +FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData( +Uuid.randomUuid(), fetchOffset, 0, fetchMaxBytes, Optional.empty() +); + +when(rsmManager.fetchLogSegment(any(), anyInt())).thenReturn(fileInputStream); +when(remoteLogMetadataManager.remoteLogSegmentMetadata(any(), anyInt(), anyLong())).thenReturn(Optional.of(segmentMetadata), Optional.of(segmentMetadata)); Review Comment: We are calling it two times in the Unit Test.So we need two times this value to be returned. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
clolov commented on code in PR #15060: URL: https://github.com/apache/kafka/pull/15060#discussion_r1435282790 ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -2161,6 +2167,86 @@ RecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long offse } } +@Test +public void testReadForFirstBatchInLogCompaction() throws RemoteStorageException, IOException { +FileInputStream fileInputStream = mock(FileInputStream.class); +RemoteLogInputStream remoteLogInputStream = mock(RemoteLogInputStream.class); +ClassLoaderAwareRemoteStorageManager rsmManager = mock(ClassLoaderAwareRemoteStorageManager.class); +RemoteLogSegmentMetadata segmentMetadata = mock(RemoteLogSegmentMetadata.class); +LeaderEpochFileCache cache = mock(LeaderEpochFileCache.class); +when(cache.epochForOffset(anyLong())).thenReturn(OptionalInt.of(1)); + + when(remoteStorageManager.fetchLogSegment(any(RemoteLogSegmentMetadata.class), anyInt())) +.thenAnswer(a -> fileInputStream); +when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + +int fetchOffset = 0; +int fetchMaxBytes = 10; +int recordBatchSizeInBytes = fetchMaxBytes + 1; +RecordBatch firstBatch = mock(RecordBatch.class); +ArgumentCaptor capture = ArgumentCaptor.forClass(ByteBuffer.class); + +FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData( +Uuid.randomUuid(), fetchOffset, 0, fetchMaxBytes, Optional.empty() +); + +when(rsmManager.fetchLogSegment(any(), anyInt())).thenReturn(fileInputStream); +when(remoteLogMetadataManager.remoteLogSegmentMetadata(any(), anyInt(), anyLong())).thenReturn(Optional.of(segmentMetadata), Optional.of(segmentMetadata)); Review Comment: Isn't one Optional.of(segmentMetadata) enough? ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -1412,8 +1422,8 @@ private void collectAbortedTransactionInLocalSegments(long startOffset, } } } - -private Optional findNextSegmentMetadata(RemoteLogSegmentMetadata segmentMetadata, +// visible for testing. +Optional findNextSegmentMetadata(RemoteLogSegmentMetadata segmentMetadata, Option leaderEpochFileCacheOption) throws RemoteStorageException { Review Comment: ```suggestion Option leaderEpochFileCacheOption) throws RemoteStorageException { ``` ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -1289,15 +1289,21 @@ public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws } RemoteLogSegmentMetadata remoteLogSegmentMetadata = rlsMetadataOptional.get(); -int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, offset); InputStream remoteSegInputStream = null; try { -// Search forward for the position of the last offset that is greater than or equal to the target offset -remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos); -RemoteLogInputStream remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream); - -RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, offset); - +int startPos = 0; +RecordBatch firstBatch = null; +while (firstBatch == null && rlsMetadataOptional.isPresent()) { Review Comment: I am not super hung-up on this, but don't you just need to look forward once i.e. you have a guarantee that if you do not find the offset in this segment then you are bound to find it in the next, no? If this is the case can you just look in the next segment and not use a while loop? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
clolov commented on PR #15060: URL: https://github.com/apache/kafka/pull/15060#issuecomment-1867443497 Thank you for the contribution! I will aim to provide a review throughout the day! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org