[ https://issues.apache.org/jira/browse/KAFKA-19204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17947364#comment-17947364 ]
Sushant Mahajan commented on KAFKA-19204: ----------------------------------------- ``` It seems that in this case, the replay of the record should be the signal that the persisterInitialize method should be called. ``` [~schofielaj] Not sure about this - replay on a shard does not imply that replication has succeeded on all replicas. The leader shard will immediately write record locally and replay - then replication happens in a deferred manner. > Timeouts in coordinator runtime operations can break share group > initialization and deletion > -------------------------------------------------------------------------------------------- > > Key: KAFKA-19204 > URL: https://issues.apache.org/jira/browse/KAFKA-19204 > Project: Kafka > Issue Type: Sub-task > Reporter: Andrew Schofield > Assignee: Sushant Mahajan > Priority: Major > > There is a class of problems in the GroupCoordinator to do with compound > share group operations. When handling requests such as ShareGroupHeartbeat, > the group coordinator service calls the > `CoordinatorRuntime.scheduleWriteOperation` method, supplying an operation to > call and a timeout. The operation returns a result and a set of records to be > written to the __consumer_offsets topic. The coordinator runtime method > returns a CompletableFuture to be completed when the operation and the > writing to the topic finishes. > The problem comes when the records are written to the topic (meaning that the > operation succeeded) but the writing of the records does not complete within > the timeout, as seems to occur if a following replica broker is restarted at > an inopportune moment. In this case, any chained logic which is waiting for > the successful completion using a method such as > CompletableFuture.thenCompose will not execute. > There are four methods in GroupCoordinatorService which are problematic in > this regard: > * shareGroupHeartbeat > * uninitializeShareGroupState > * performShareGroupStateMetadataInitialize > * deleteShareGroups > Let's look at the first of these, shareGroupHeartbeat. > The write operation calls GroupCoordinatorRuntime.shareGroupHeartbeat. That > does the processing for the heartbeat and returns the records which will form > the persistent updates to remember the heartbeat's effects. This might > include a ShareGroupStatePartitionMetadataRecord if it is necessary to ask > the ShareCoordinator to initialize a new partition. > So far, so good. But the GC expects to call > GroupCoordinatorService.persisterInitialize when the operation completes > successfully. However, if the operation does complete successfully, the > records are added to the consumer-offsets topic but THEN the replication of > the records does not complete within the timeout, the call to > persisterInitialize will not occur. > Subsequently, the writing of the records may complete, which drives the > replay logic, but that does not initiate the persisterInitialize either. So > the initializing partitions remain in limbo. > It seems that in this case, the replay of the record should be the signal > that the persisterInitialize method should be called. > The uninitializeShareGroupState and performShareGroupStateMetadataInitialize > method have a similar problem, but less tricky I think because the action to > be performed after completion is more amenable to mild restructuring. > The final case, deleteGroups, looks like it needs more work, but that's > potentially just allowing the persister's deletion to be deferred until the > replay of the ShareGroupStatePartitionMetadataRecord containing the > deletingTopics field. > Thanks to [~cwadhwa] for digging into this as a result of his multi-broker > system tests. -- This message was sent by Atlassian Jira (v8.20.10#820010)