[ 
https://issues.apache.org/jira/browse/KAFKA-14075?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Kim updated KAFKA-14075:
-----------------------------
    Description: 
In 
[GroupMetadata.removeAllOffsets()|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala#L729-L740]
 we pass in the offsets cache to delete pendingTransactionalOffsetCommits upon 
group deletion. So only transactional offset commits for topic partitions 
already in the offsets cache will be deleted.

However, we add a transactional offset commit to the offsets cache only after 
the commit/abort marker is written to the log in 
[GroupMetadata.completePendingTxnOffsetCommit()|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala#L692]

So even after a group deletion we can still have pending transactional offset 
commits for a group that's supposed to be deleted. The group metadata manager 
will throw an IllegalStateException 
[here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L740]
 while loading group to memory. We will hit this exception on every load to 
group as long as the hanging transaction never completes. 

We should delete all pending transactional offset commits (instead of only 
topic partitions that exist in the offsets cache) when a group is deleted in 
GroupMetadata.removeOffsets()

To mitigate, user can abort all hanging transactions for the consumer offsets 
partition via cli.

  was:
In 
[GroupMetadata.removeAllOffsets()|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala#L729-L740]
 we pass in the offsets cache to delete pendingTransactionalOffsetCommits upon 
group deletion. So only transactional offset commits for topic partitions 
already in the offsets cache will be deleted.

However, we add a transactional offset commit to the offsets cache only after 
the commit/abort marker is written to the log in 
[GroupMetadata.completePendingTxnOffsetCommit()|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala#L692]

So even after a group deletion we can still have pending transactional offset 
commits for a group that's supposed to be deleted. The group metadata manager 
will throw an IllegalStateException 
[here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L740]
 while loading group to memory. We will hit this exception on every load to 
group as long as the hanging transaction never completes. 

We should delete all pending transactional offset commits (instead of only 
topic partitions that exist in the offsets cache) when a group is deleted in 
GroupMetadata.removeOffsets()


> Consumer Group deletion does not delete pending transactional offset commits
> ----------------------------------------------------------------------------
>
>                 Key: KAFKA-14075
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14075
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Jeff Kim
>            Assignee: Jeff Kim
>            Priority: Major
>
> In 
> [GroupMetadata.removeAllOffsets()|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala#L729-L740]
>  we pass in the offsets cache to delete pendingTransactionalOffsetCommits 
> upon group deletion. So only transactional offset commits for topic 
> partitions already in the offsets cache will be deleted.
> However, we add a transactional offset commit to the offsets cache only after 
> the commit/abort marker is written to the log in 
> [GroupMetadata.completePendingTxnOffsetCommit()|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala#L692]
> So even after a group deletion we can still have pending transactional offset 
> commits for a group that's supposed to be deleted. The group metadata manager 
> will throw an IllegalStateException 
> [here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L740]
>  while loading group to memory. We will hit this exception on every load to 
> group as long as the hanging transaction never completes. 
> We should delete all pending transactional offset commits (instead of only 
> topic partitions that exist in the offsets cache) when a group is deleted in 
> GroupMetadata.removeOffsets()
> To mitigate, user can abort all hanging transactions for the consumer offsets 
> partition via cli.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to