[jira] [Created] (KAFKA-16351) add groupid in fetch request

2024-03-07 Thread zou shengfu (Jira)
zou shengfu created KAFKA-16351:
---

 Summary: add groupid in fetch request
 Key: KAFKA-16351
 URL: https://issues.apache.org/jira/browse/KAFKA-16351
 Project: Kafka
  Issue Type: Improvement
  Components: clients, core
Reporter: zou shengfu


Why we do not add groupid  field in fetch request? If  fetch request contains 
the group id, firstly, kafka broker can measure the traffic for every consumer 
that consume the same topic. And than kafka broker can limit the traffic for 
every consumer. Will we add the group id filed in fetch request in the future?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14885) Client can connect to broker and broker can not connect zookeeper

2023-04-10 Thread zou shengfu (Jira)
zou shengfu created KAFKA-14885:
---

 Summary: Client can connect to broker and broker can not connect 
zookeeper
 Key: KAFKA-14885
 URL: https://issues.apache.org/jira/browse/KAFKA-14885
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 3.3.2
Reporter: zou shengfu
Assignee: zou shengfu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14849) Kafka consumers receive INCONSISTENT_GROUP_PROTOCOL error even the configuration of all consumers are same

2023-03-26 Thread zou shengfu (Jira)
zou shengfu created KAFKA-14849:
---

 Summary: Kafka consumers receive INCONSISTENT_GROUP_PROTOCOL error 
even the configuration of all consumers are same
 Key: KAFKA-14849
 URL: https://issues.apache.org/jira/browse/KAFKA-14849
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 2.5.1
Reporter: zou shengfu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14832) Thread unsafe for GroupMetadata when persisting metadata

2023-03-26 Thread zou shengfu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zou shengfu resolved KAFKA-14832.
-
Resolution: Abandoned

> Thread unsafe for GroupMetadata when persisting metadata 
> -
>
> Key: KAFKA-14832
> URL: https://issues.apache.org/jira/browse/KAFKA-14832
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.3.2
>Reporter: zou shengfu
>Assignee: zou shengfu
>Priority: Major
>
> {code:java}
> groupManager.storeGroup(group, groupAssignment, error => {
>if (error != Errors.NONE) {
>  warn(s"Failed to persist metadata for group ${group.groupId}: 
> ${error.message}")
>  // Failed to persist member.id of the given static member, 
> revert the update of the static member in the group.  
>   group.updateMember(knownStaticMember, oldProtocols, 
> oldRebalanceTimeoutMs, oldSessionTimeoutMs, null)
>   val oldMember = group.replaceStaticMember(groupInstanceId, newMemberId, 
> oldMemberId)   }{code}
> Should we add a lock to protect group?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14832) Thread unsafe for GroupMetadata

2023-03-22 Thread zou shengfu (Jira)
zou shengfu created KAFKA-14832:
---

 Summary: Thread unsafe for GroupMetadata
 Key: KAFKA-14832
 URL: https://issues.apache.org/jira/browse/KAFKA-14832
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 3.3.2
Reporter: zou shengfu
Assignee: zou shengfu


          groupManager.storeGroup(group, groupAssignment, error => {
            if (error != Errors.NONE) {
              warn(s"Failed to persist metadata for group ${group.groupId}: 
${error.message}")

              // Failed to persist member.id of the given static member, revert 
the update of the static member in the group.
              group.updateMember(knownStaticMember, oldProtocols, 
oldRebalanceTimeoutMs, oldSessionTimeoutMs, null)
              val oldMember = group.replaceStaticMember(groupInstanceId, 
newMemberId, oldMemberId)
              completeAndScheduleNextHeartbeatExpiration(group, oldMember)
              responseCallback(JoinGroupResult(
                List.empty,
                memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID,
                generationId = group.generationId,
                protocolType = group.protocolType,
                protocolName = group.protocolName,
                leaderId = currentLeader,
                skipAssignment = false,
                error = error
              ))
            } else if (supportSkippingAssignment) {
              // Starting from version 9 of the JoinGroup API, static members 
are able to
              // skip running the assignor based on the `SkipAssignment` field. 
We leverage
              // this to tell the leader that it is the leader of the group but 
by skipping
              // running the assignor while the group is in stable state.
              // Notes:
              // 1) This allows the leader to continue monitoring metadata 
changes for the
              // group. Note that any metadata changes happening while the 
static leader is
              // down won't be noticed.
              // 2) The assignors are not idempotent nor free from side 
effects. This is why
              // we skip entirely the assignment step as it could generate a 
different group
              // assignment which would be ignored by the group coordinator 
because the group
              // is the stable state.
              val isLeader = group.isLeader(newMemberId)
              group.maybeInvokeJoinCallback(member, JoinGroupResult(
                members = if (isLeader) {
                  group.currentMemberMetadata
                } else {
                  List.empty
                },
                memberId = newMemberId,
                generationId = group.generationId,
                protocolType = group.protocolType,
                protocolName = group.protocolName,
                leaderId = group.leaderOrNull,
                skipAssignment = isLeader,
                error = Errors.NONE
              ))
            } else {
              // Prior to version 9 of the JoinGroup API, we wanted to avoid 
current leader
              // performing trivial assignment while the group is in stable 
stage, because
              // the new assignment in leader's next sync call won't be 
broadcast by a stable group.
              // This could be guaranteed by always returning the old leader id 
so that the current
              // leader won't assume itself as a leader based on the returned 
message, since the new
              // member.id won't match returned leader id, therefore no 
assignment will be performed.
              group.maybeInvokeJoinCallback(member, JoinGroupResult(
                members = List.empty,
                memberId = newMemberId,
                generationId = group.generationId,
                protocolType = group.protocolType,
                protocolName = group.protocolName,
                leaderId = currentLeader,
                skipAssignment = false,
                error = Errors.NONE
              ))
            }



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14009) Consumers do not reduce rebalancetimeout when consumer use static membership

2022-06-19 Thread zou shengfu (Jira)
zou shengfu created KAFKA-14009:
---

 Summary: Consumers do not reduce rebalancetimeout when consumer 
use static membership
 Key: KAFKA-14009
 URL: https://issues.apache.org/jira/browse/KAFKA-14009
 Project: Kafka
  Issue Type: Bug
  Components: consumer, core
Affects Versions: 2.6.1
Reporter: zou shengfu






--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (KAFKA-12169) Consumer can not know paritions chage when client leader restart with static membership protocol

2021-01-09 Thread zou shengfu (Jira)
zou shengfu created KAFKA-12169:
---

 Summary: Consumer can not know paritions chage when client leader 
restart with static membership protocol
 Key: KAFKA-12169
 URL: https://issues.apache.org/jira/browse/KAFKA-12169
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 2.6.1, 2.5.1
Reporter: zou shengfu


Background: 
Kafka consumer services run on kubernetes, and services often restart because 
of operation. When we added partitions from 1000 to 2000 for the topic, client 
leader restart with unknown member id at the same time, we found  the consumers 
do not tigger rebalance and still consume 1000 paritions

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)