JimmyWang6 commented on code in PR #20708:
URL: https://github.com/apache/kafka/pull/20708#discussion_r2434915587
##########
tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java:
##########
@@ -385,10 +386,25 @@ void resetOffsets() {
if
(!(GroupState.EMPTY.equals(shareGroupDescription.groupState()) ||
GroupState.DEAD.equals(shareGroupDescription.groupState()))) {
CommandLineUtils.printErrorAndExit(String.format("Share
group '%s' is not empty.", groupId));
}
- Map<TopicPartition, OffsetAndMetadata> offsetsToReset =
prepareOffsetsToReset(groupId);
- if (offsetsToReset == null) {
- return;
+ resetOffsetsForInactiveGroup(groupId);
+ } catch (InterruptedException ie) {
+ throw new RuntimeException(ie);
+ } catch (ExecutionException ee) {
+ Throwable cause = ee.getCause();
+ if (cause instanceof GroupIdNotFoundException) {
+ resetOffsetsForInactiveGroup(groupId);
Review Comment:
Just wondering, why is there a retry mechanism when
`GroupIdNotFoundException` is thrown? Is this for better reliability? I see
ConsumerGroupCommand also does this.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -793,19 +790,7 @@ public
CoordinatorResult<Map.Entry<AlterShareGroupOffsetsResponseData, Initializ
String groupId,
AlterShareGroupOffsetsRequestData alterShareGroupOffsetsRequestData
) {
- List<CoordinatorRecord> records = new ArrayList<>();
- ShareGroup group = groupMetadataManager.shareGroup(groupId);
- group.validateOffsetsAlterable();
Review Comment:
I think the method `validateOffsetsAlterable()` could be removed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]