zou shengfu created KAFKA-14832:
-----------------------------------

             Summary: Thread unsafe for GroupMetadata
                 Key: KAFKA-14832
                 URL: https://issues.apache.org/jira/browse/KAFKA-14832
             Project: Kafka
          Issue Type: Bug
          Components: core
    Affects Versions: 3.3.2
            Reporter: zou shengfu
            Assignee: zou shengfu


          groupManager.storeGroup(group, groupAssignment, error => {
            if (error != Errors.NONE) {
              warn(s"Failed to persist metadata for group ${group.groupId}: 
${error.message}")

              // Failed to persist member.id of the given static member, revert 
the update of the static member in the group.
              group.updateMember(knownStaticMember, oldProtocols, 
oldRebalanceTimeoutMs, oldSessionTimeoutMs, null)
              val oldMember = group.replaceStaticMember(groupInstanceId, 
newMemberId, oldMemberId)
              completeAndScheduleNextHeartbeatExpiration(group, oldMember)
              responseCallback(JoinGroupResult(
                List.empty,
                memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID,
                generationId = group.generationId,
                protocolType = group.protocolType,
                protocolName = group.protocolName,
                leaderId = currentLeader,
                skipAssignment = false,
                error = error
              ))
            } else if (supportSkippingAssignment) {
              // Starting from version 9 of the JoinGroup API, static members 
are able to
              // skip running the assignor based on the `SkipAssignment` field. 
We leverage
              // this to tell the leader that it is the leader of the group but 
by skipping
              // running the assignor while the group is in stable state.
              // Notes:
              // 1) This allows the leader to continue monitoring metadata 
changes for the
              // group. Note that any metadata changes happening while the 
static leader is
              // down won't be noticed.
              // 2) The assignors are not idempotent nor free from side 
effects. This is why
              // we skip entirely the assignment step as it could generate a 
different group
              // assignment which would be ignored by the group coordinator 
because the group
              // is the stable state.
              val isLeader = group.isLeader(newMemberId)
              group.maybeInvokeJoinCallback(member, JoinGroupResult(
                members = if (isLeader) {
                  group.currentMemberMetadata
                } else {
                  List.empty
                },
                memberId = newMemberId,
                generationId = group.generationId,
                protocolType = group.protocolType,
                protocolName = group.protocolName,
                leaderId = group.leaderOrNull,
                skipAssignment = isLeader,
                error = Errors.NONE
              ))
            } else {
              // Prior to version 9 of the JoinGroup API, we wanted to avoid 
current leader
              // performing trivial assignment while the group is in stable 
stage, because
              // the new assignment in leader's next sync call won't be 
broadcast by a stable group.
              // This could be guaranteed by always returning the old leader id 
so that the current
              // leader won't assume itself as a leader based on the returned 
message, since the new
              // member.id won't match returned leader id, therefore no 
assignment will be performed.
              group.maybeInvokeJoinCallback(member, JoinGroupResult(
                members = List.empty,
                memberId = newMemberId,
                generationId = group.generationId,
                protocolType = group.protocolType,
                protocolName = group.protocolName,
                leaderId = currentLeader,
                skipAssignment = false,
                error = Errors.NONE
              ))
            }



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to