Andrew Schofield created KAFKA-19204:
----------------------------------------

             Summary: Timeouts in coordinator runtime operations can break 
share group initialization and deletion
                 Key: KAFKA-19204
                 URL: https://issues.apache.org/jira/browse/KAFKA-19204
             Project: Kafka
          Issue Type: Sub-task
            Reporter: Andrew Schofield
            Assignee: Sushant Mahajan


There is a class of problems in the GroupCoordinator to do with compound share 
group operations. When handling requests such as ShareGroupHeartbeat, the group 
coordinator service calls the `CoordinatorRuntime.scheduleWriteOperation` 
method, supplying an operation to call and a timeout. The operation returns a 
result and a set of records to be written to the __consumer_offsets topic. The 
coordinator runtime method returns a CompletableFuture to be completed when the 
operation and the writing to the topic finishes.

The problem comes when the records are written to the topic (meaning that the 
operation succeeded) but the writing of the records does not complete within 
the timeout, as seems to occur if a following replica broker is restarted at an 
inopportune moment. In this case, any chained logic which is waiting for the 
successful completion using a method such as CompletableFuture.thenCompose will 
not execute.

There are four methods in GroupCoordinatorService which are problematic in this 
regard:
 * shareGroupHeartbeat
 * uninitializeShareGroupState
 * performShareGroupStateMetadataInitialize
 * deleteShareGroups

Let's look at the first of these, shareGroupHeartbeat.

The write operation calls GroupCoordinatorRuntime.shareGroupHeartbeat. That 
does the processing for the heartbeat and returns the records which will form 
the persistent updates to remember the heartbeat's effects. This might include 
a ShareGroupStatePartitionMetadataRecord if it is necessary to ask the 
ShareCoordinator to initialize a new partition.

So far, so good. But the GC expects to call 
GroupCoordinatorService.persisterInitialize when the operation completes 
successfully. However, if the operation does complete successfully, the records 
are added to the consumer-offsets topic but THEN the replication of the records 
does not complete within the timeout, the call to persisterInitialize will not 
occur.

Subsequently, the writing of the records may complete, which drives the replay 
logic, but that does not initiate the persisterInitialize either. So the 
initializing partitions remain in limbo.

It seems that in this case, the replay of the record should be the signal that 
the persisterInitialize method should be called.

The uninitializeShareGroupState and performShareGroupStateMetadataInitialize 
method have a similar problem, but less tricky I think because the action to be 
performed after completion is more amenable to mild restructuring.

The final case, deleteGroups, looks like it needs more work, but that's 
potentially just allowing the persister's deletion to be deferred until the 
replay of the ShareGroupStatePartitionMetadataRecord containing the 
deletingTopics field.

Thanks to [~cwadhwa] for digging into this as a result of his multi-broker 
system tests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to