[jira] [Created] (KAFKA-16351) add groupid in fetch request
zou shengfu created KAFKA-16351: --- Summary: add groupid in fetch request Key: KAFKA-16351 URL: https://issues.apache.org/jira/browse/KAFKA-16351 Project: Kafka Issue Type: Improvement Components: clients, core Reporter: zou shengfu Why we do not add groupid field in fetch request? If fetch request contains the group id, firstly, kafka broker can measure the traffic for every consumer that consume the same topic. And than kafka broker can limit the traffic for every consumer. Will we add the group id filed in fetch request in the future? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14885) Client can connect to broker and broker can not connect zookeeper
zou shengfu created KAFKA-14885: --- Summary: Client can connect to broker and broker can not connect zookeeper Key: KAFKA-14885 URL: https://issues.apache.org/jira/browse/KAFKA-14885 Project: Kafka Issue Type: Bug Components: core Affects Versions: 3.3.2 Reporter: zou shengfu Assignee: zou shengfu -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14849) Kafka consumers receive INCONSISTENT_GROUP_PROTOCOL error even the configuration of all consumers are same
zou shengfu created KAFKA-14849: --- Summary: Kafka consumers receive INCONSISTENT_GROUP_PROTOCOL error even the configuration of all consumers are same Key: KAFKA-14849 URL: https://issues.apache.org/jira/browse/KAFKA-14849 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 2.5.1 Reporter: zou shengfu -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14832) Thread unsafe for GroupMetadata when persisting metadata
[ https://issues.apache.org/jira/browse/KAFKA-14832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zou shengfu resolved KAFKA-14832. - Resolution: Abandoned > Thread unsafe for GroupMetadata when persisting metadata > - > > Key: KAFKA-14832 > URL: https://issues.apache.org/jira/browse/KAFKA-14832 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.3.2 >Reporter: zou shengfu >Assignee: zou shengfu >Priority: Major > > {code:java} > groupManager.storeGroup(group, groupAssignment, error => { >if (error != Errors.NONE) { > warn(s"Failed to persist metadata for group ${group.groupId}: > ${error.message}") > // Failed to persist member.id of the given static member, > revert the update of the static member in the group. > group.updateMember(knownStaticMember, oldProtocols, > oldRebalanceTimeoutMs, oldSessionTimeoutMs, null) > val oldMember = group.replaceStaticMember(groupInstanceId, newMemberId, > oldMemberId) }{code} > Should we add a lock to protect group? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14832) Thread unsafe for GroupMetadata
zou shengfu created KAFKA-14832: --- Summary: Thread unsafe for GroupMetadata Key: KAFKA-14832 URL: https://issues.apache.org/jira/browse/KAFKA-14832 Project: Kafka Issue Type: Bug Components: core Affects Versions: 3.3.2 Reporter: zou shengfu Assignee: zou shengfu groupManager.storeGroup(group, groupAssignment, error => { if (error != Errors.NONE) { warn(s"Failed to persist metadata for group ${group.groupId}: ${error.message}") // Failed to persist member.id of the given static member, revert the update of the static member in the group. group.updateMember(knownStaticMember, oldProtocols, oldRebalanceTimeoutMs, oldSessionTimeoutMs, null) val oldMember = group.replaceStaticMember(groupInstanceId, newMemberId, oldMemberId) completeAndScheduleNextHeartbeatExpiration(group, oldMember) responseCallback(JoinGroupResult( List.empty, memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID, generationId = group.generationId, protocolType = group.protocolType, protocolName = group.protocolName, leaderId = currentLeader, skipAssignment = false, error = error )) } else if (supportSkippingAssignment) { // Starting from version 9 of the JoinGroup API, static members are able to // skip running the assignor based on the `SkipAssignment` field. We leverage // this to tell the leader that it is the leader of the group but by skipping // running the assignor while the group is in stable state. // Notes: // 1) This allows the leader to continue monitoring metadata changes for the // group. Note that any metadata changes happening while the static leader is // down won't be noticed. // 2) The assignors are not idempotent nor free from side effects. This is why // we skip entirely the assignment step as it could generate a different group // assignment which would be ignored by the group coordinator because the group // is the stable state. val isLeader = group.isLeader(newMemberId) group.maybeInvokeJoinCallback(member, JoinGroupResult( members = if (isLeader) { group.currentMemberMetadata } else { List.empty }, memberId = newMemberId, generationId = group.generationId, protocolType = group.protocolType, protocolName = group.protocolName, leaderId = group.leaderOrNull, skipAssignment = isLeader, error = Errors.NONE )) } else { // Prior to version 9 of the JoinGroup API, we wanted to avoid current leader // performing trivial assignment while the group is in stable stage, because // the new assignment in leader's next sync call won't be broadcast by a stable group. // This could be guaranteed by always returning the old leader id so that the current // leader won't assume itself as a leader based on the returned message, since the new // member.id won't match returned leader id, therefore no assignment will be performed. group.maybeInvokeJoinCallback(member, JoinGroupResult( members = List.empty, memberId = newMemberId, generationId = group.generationId, protocolType = group.protocolType, protocolName = group.protocolName, leaderId = currentLeader, skipAssignment = false, error = Errors.NONE )) } -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14009) Consumers do not reduce rebalancetimeout when consumer use static membership
zou shengfu created KAFKA-14009: --- Summary: Consumers do not reduce rebalancetimeout when consumer use static membership Key: KAFKA-14009 URL: https://issues.apache.org/jira/browse/KAFKA-14009 Project: Kafka Issue Type: Bug Components: consumer, core Affects Versions: 2.6.1 Reporter: zou shengfu -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (KAFKA-12169) Consumer can not know paritions chage when client leader restart with static membership protocol
zou shengfu created KAFKA-12169: --- Summary: Consumer can not know paritions chage when client leader restart with static membership protocol Key: KAFKA-12169 URL: https://issues.apache.org/jira/browse/KAFKA-12169 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 2.6.1, 2.5.1 Reporter: zou shengfu Background: Kafka consumer services run on kubernetes, and services often restart because of operation. When we added partitions from 1000 to 2000 for the topic, client leader restart with unknown member id at the same time, we found the consumers do not tigger rebalance and still consume 1000 paritions -- This message was sent by Atlassian Jira (v8.3.4#803005)