Andrew Schofield created KAFKA-19204: ----------------------------------------
Summary: Timeouts in coordinator runtime operations can break share group initialization and deletion Key: KAFKA-19204 URL: https://issues.apache.org/jira/browse/KAFKA-19204 Project: Kafka Issue Type: Sub-task Reporter: Andrew Schofield Assignee: Sushant Mahajan There is a class of problems in the GroupCoordinator to do with compound share group operations. When handling requests such as ShareGroupHeartbeat, the group coordinator service calls the `CoordinatorRuntime.scheduleWriteOperation` method, supplying an operation to call and a timeout. The operation returns a result and a set of records to be written to the __consumer_offsets topic. The coordinator runtime method returns a CompletableFuture to be completed when the operation and the writing to the topic finishes. The problem comes when the records are written to the topic (meaning that the operation succeeded) but the writing of the records does not complete within the timeout, as seems to occur if a following replica broker is restarted at an inopportune moment. In this case, any chained logic which is waiting for the successful completion using a method such as CompletableFuture.thenCompose will not execute. There are four methods in GroupCoordinatorService which are problematic in this regard: * shareGroupHeartbeat * uninitializeShareGroupState * performShareGroupStateMetadataInitialize * deleteShareGroups Let's look at the first of these, shareGroupHeartbeat. The write operation calls GroupCoordinatorRuntime.shareGroupHeartbeat. That does the processing for the heartbeat and returns the records which will form the persistent updates to remember the heartbeat's effects. This might include a ShareGroupStatePartitionMetadataRecord if it is necessary to ask the ShareCoordinator to initialize a new partition. So far, so good. But the GC expects to call GroupCoordinatorService.persisterInitialize when the operation completes successfully. However, if the operation does complete successfully, the records are added to the consumer-offsets topic but THEN the replication of the records does not complete within the timeout, the call to persisterInitialize will not occur. Subsequently, the writing of the records may complete, which drives the replay logic, but that does not initiate the persisterInitialize either. So the initializing partitions remain in limbo. It seems that in this case, the replay of the record should be the signal that the persisterInitialize method should be called. The uninitializeShareGroupState and performShareGroupStateMetadataInitialize method have a similar problem, but less tricky I think because the action to be performed after completion is more amenable to mild restructuring. The final case, deleteGroups, looks like it needs more work, but that's potentially just allowing the persister's deletion to be deferred until the replay of the ShareGroupStatePartitionMetadataRecord containing the deletingTopics field. Thanks to [~cwadhwa] for digging into this as a result of his multi-broker system tests. -- This message was sent by Atlassian Jira (v8.20.10#820010)