[ 
https://issues.apache.org/jira/browse/KAFKA-19204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17947364#comment-17947364
 ] 

Sushant Mahajan commented on KAFKA-19204:
-----------------------------------------

```

It seems that in this case, the replay of the record should be the signal that 
the persisterInitialize method should be called.

```

[~schofielaj] 

Not sure about this - replay on a shard does not imply that replication has 
succeeded on all replicas. The leader shard will immediately write record 
locally and replay - then replication happens in a deferred manner.

> Timeouts in coordinator runtime operations can break share group 
> initialization and deletion
> --------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-19204
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19204
>             Project: Kafka
>          Issue Type: Sub-task
>            Reporter: Andrew Schofield
>            Assignee: Sushant Mahajan
>            Priority: Major
>
> There is a class of problems in the GroupCoordinator to do with compound 
> share group operations. When handling requests such as ShareGroupHeartbeat, 
> the group coordinator service calls the 
> `CoordinatorRuntime.scheduleWriteOperation` method, supplying an operation to 
> call and a timeout. The operation returns a result and a set of records to be 
> written to the __consumer_offsets topic. The coordinator runtime method 
> returns a CompletableFuture to be completed when the operation and the 
> writing to the topic finishes.
> The problem comes when the records are written to the topic (meaning that the 
> operation succeeded) but the writing of the records does not complete within 
> the timeout, as seems to occur if a following replica broker is restarted at 
> an inopportune moment. In this case, any chained logic which is waiting for 
> the successful completion using a method such as 
> CompletableFuture.thenCompose will not execute.
> There are four methods in GroupCoordinatorService which are problematic in 
> this regard:
>  * shareGroupHeartbeat
>  * uninitializeShareGroupState
>  * performShareGroupStateMetadataInitialize
>  * deleteShareGroups
> Let's look at the first of these, shareGroupHeartbeat.
> The write operation calls GroupCoordinatorRuntime.shareGroupHeartbeat. That 
> does the processing for the heartbeat and returns the records which will form 
> the persistent updates to remember the heartbeat's effects. This might 
> include a ShareGroupStatePartitionMetadataRecord if it is necessary to ask 
> the ShareCoordinator to initialize a new partition.
> So far, so good. But the GC expects to call 
> GroupCoordinatorService.persisterInitialize when the operation completes 
> successfully. However, if the operation does complete successfully, the 
> records are added to the consumer-offsets topic but THEN the replication of 
> the records does not complete within the timeout, the call to 
> persisterInitialize will not occur.
> Subsequently, the writing of the records may complete, which drives the 
> replay logic, but that does not initiate the persisterInitialize either. So 
> the initializing partitions remain in limbo.
> It seems that in this case, the replay of the record should be the signal 
> that the persisterInitialize method should be called.
> The uninitializeShareGroupState and performShareGroupStateMetadataInitialize 
> method have a similar problem, but less tricky I think because the action to 
> be performed after completion is more amenable to mild restructuring.
> The final case, deleteGroups, looks like it needs more work, but that's 
> potentially just allowing the persister's deletion to be deferred until the 
> replay of the ShareGroupStatePartitionMetadataRecord containing the 
> deletingTopics field.
> Thanks to [~cwadhwa] for digging into this as a result of his multi-broker 
> system tests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to