This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 94ef25a KAFKA-9801: Still trigger rebalance when static member joins in CompletingRebalance phase (#8405) 94ef25a is described below commit 94ef25ab9128030de8692b6e690787ec8012830a Author: Guozhang Wang <wangg...@gmail.com> AuthorDate: Tue Apr 7 15:29:15 2020 -0700 KAFKA-9801: Still trigger rebalance when static member joins in CompletingRebalance phase (#8405) Fix the direct cause of the observed issue on the client side: when heartbeat getting errors and resetting generation, we only need to set it to UNJOINED when it was not already in REBALANCING; otherwise, the join-group handler would throw the retriable UnjoinedGroupException to force the consumer to re-send join group unnecessarily. Fix the root cause of the issue on the broker side: we should still trigger rebalance when static member joins in CompletingRebalance phase; otherwise the member.ids would be changed when the assignment is received from the leader, hence causing the new member.id's assignment to be empty. Reviewers: Boyang Chen <boy...@confluent.io>, Jason Gustafson <ja...@confluent.io> --- .../consumer/internals/AbstractCoordinator.java | 39 +++++--- .../consumer/internals/ConsumerCoordinator.java | 7 ++ .../clients/consumer/internals/Heartbeat.java | 35 +++++--- .../internals/AbstractCoordinatorTest.java | 46 ++++++++++ .../kafka/coordinator/group/GroupCoordinator.scala | 100 ++++++++++++--------- .../kafka/coordinator/group/GroupMetadata.scala | 8 +- .../kafka/api/PlaintextConsumerTest.scala | 7 +- .../coordinator/group/GroupCoordinatorTest.scala | 26 +++--- 8 files changed, 183 insertions(+), 85 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index af67ab5..66da319 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -451,8 +451,9 @@ public abstract class AbstractCoordinator implements Closeable { return false; } } else { - resetJoinGroupFuture(); final RuntimeException exception = future.exception(); + log.info("Join group failed with {}", exception.toString()); + resetJoinGroupFuture(); if (exception instanceof UnknownMemberIdException || exception instanceof RebalanceInProgressException || exception instanceof IllegalGenerationException || @@ -889,17 +890,26 @@ public abstract class AbstractCoordinator implements Closeable { } private synchronized void resetGeneration() { + this.rejoinNeeded = true; this.generation = Generation.NO_GENERATION; - resetStateAndRejoin(); } synchronized void resetGenerationOnResponseError(ApiKeys api, Errors error) { - log.debug("Resetting generation after encountering {} from {} response", error, api); + log.debug("Resetting generation after encountering {} from {} response and requesting re-join", error, api); + + // only reset the state to un-joined when it is not already in rebalancing + if (state != MemberState.REBALANCING) + state = MemberState.UNJOINED; + resetGeneration(); } synchronized void resetGenerationOnLeaveGroup() { log.debug("Resetting generation due to consumer pro-actively leaving the group"); + + // always set the state to un-joined + state = MemberState.UNJOINED; + resetGeneration(); } @@ -1014,7 +1024,8 @@ public abstract class AbstractCoordinator implements Closeable { // visible for testing synchronized RequestFuture<Void> sendHeartbeatRequest() { - log.debug("Sending Heartbeat request to coordinator {}", coordinator); + log.debug("Sending Heartbeat request with generation {} and member id {} to coordinator {}", + generation.generationId, generation.memberId, coordinator); HeartbeatRequest.Builder requestBuilder = new HeartbeatRequest.Builder(new HeartbeatRequestData() .setGroupId(rebalanceConfig.groupId) @@ -1022,10 +1033,16 @@ public abstract class AbstractCoordinator implements Closeable { .setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null)) .setGenerationId(this.generation.generationId)); return client.send(coordinator, requestBuilder) - .compose(new HeartbeatResponseHandler()); + .compose(new HeartbeatResponseHandler(generation)); } private class HeartbeatResponseHandler extends CoordinatorResponseHandler<HeartbeatResponse, Void> { + private final Generation sentGeneration; + + private HeartbeatResponseHandler(final Generation generation) { + this.sentGeneration = generation; + } + @Override public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) { sensors.heartbeatSensor.record(response.requestLatencyMs()); @@ -1035,7 +1052,7 @@ public abstract class AbstractCoordinator implements Closeable { future.complete(null); } else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) { - log.info("Attempt to heartbeat failed since coordinator {} is either not started or not valid.", + log.info("Attempt to heartbeat failed since coordinator {} is either not started or not valid", coordinator()); markCoordinatorUnknown(); future.raise(error); @@ -1044,14 +1061,14 @@ public abstract class AbstractCoordinator implements Closeable { requestRejoin(); future.raise(error); } else if (error == Errors.ILLEGAL_GENERATION) { - log.info("Attempt to heartbeat failed since generation {} is not current", generation.generationId); + log.info("Attempt to heartbeat failed since generation {} is not current", sentGeneration.generationId); resetGenerationOnResponseError(ApiKeys.HEARTBEAT, error); future.raise(error); } else if (error == Errors.FENCED_INSTANCE_ID) { log.error("Received fatal exception: group.instance.id gets fenced"); future.raise(error); } else if (error == Errors.UNKNOWN_MEMBER_ID) { - log.info("Attempt to heartbeat failed since member id {} is not valid.", generation.memberId); + log.info("Attempt to heartbeat failed since member id {} is not valid.", sentGeneration.memberId); resetGenerationOnResponseError(ApiKeys.HEARTBEAT, error); future.raise(error); } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { @@ -1293,8 +1310,8 @@ public abstract class AbstractCoordinator implements Closeable { AbstractCoordinator.this.wait(rebalanceConfig.retryBackoffMs); } else { heartbeat.sentHeartbeat(now); - - sendHeartbeatRequest().addListener(new RequestFutureListener<Void>() { + final RequestFuture<Void> heartbeatFuture = sendHeartbeatRequest(); + heartbeatFuture.addListener(new RequestFutureListener<Void>() { @Override public void onSuccess(Void value) { synchronized (AbstractCoordinator.this) { @@ -1432,6 +1449,4 @@ public abstract class AbstractCoordinator implements Closeable { final boolean hasValidMemberId() { return generation != Generation.NO_GENERATION && generation.hasMemberId(); } - - } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index ab49837..b0f476a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -349,6 +349,13 @@ public final class ConsumerCoordinator extends AbstractCoordinator { Set<TopicPartition> ownedPartitions = new HashSet<>(subscriptions.assignedPartitions()); + // should at least encode the short version + if (assignmentBuffer.remaining() < 2) + throw new IllegalStateException("There is insufficient bytes available to read assignment from the sync-group response (" + + "actual byte size " + assignmentBuffer.remaining() + ") , this is not expected; " + + "it is possible that the leader's assign function is buggy and did not return any assignment for this member, " + + "or because static member is configured and the protocol is buggy hence did not get the assignment for this member"); + Assignment assignment = ConsumerProtocol.deserializeAssignment(assignmentBuffer); Set<TopicPartition> assignedPartitions = new HashSet<>(assignment.partitions()); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java index 4d19ef4..2e9a5ad 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java @@ -32,6 +32,7 @@ public final class Heartbeat { private final Timer pollTimer; private volatile long lastHeartbeatSend = 0L; + private volatile boolean heartbeatInFlight = false; public Heartbeat(GroupRebalanceConfig config, Time time) { @@ -56,60 +57,66 @@ public final class Heartbeat { pollTimer.reset(maxPollIntervalMs); } - public void sentHeartbeat(long now) { - this.lastHeartbeatSend = now; + boolean hasInflight() { + return heartbeatInFlight; + } + + void sentHeartbeat(long now) { + lastHeartbeatSend = now; + heartbeatInFlight = true; update(now); heartbeatTimer.reset(rebalanceConfig.heartbeatIntervalMs); } - public void failHeartbeat() { + void failHeartbeat() { update(time.milliseconds()); + heartbeatInFlight = false; heartbeatTimer.reset(rebalanceConfig.retryBackoffMs); } - public void receiveHeartbeat() { + void receiveHeartbeat() { update(time.milliseconds()); + heartbeatInFlight = false; sessionTimer.reset(rebalanceConfig.sessionTimeoutMs); } - public boolean shouldHeartbeat(long now) { + boolean shouldHeartbeat(long now) { update(now); return heartbeatTimer.isExpired(); } - public long lastHeartbeatSend() { + long lastHeartbeatSend() { return this.lastHeartbeatSend; } - public long timeToNextHeartbeat(long now) { + long timeToNextHeartbeat(long now) { update(now); return heartbeatTimer.remainingMs(); } - public boolean sessionTimeoutExpired(long now) { + boolean sessionTimeoutExpired(long now) { update(now); return sessionTimer.isExpired(); } - public void resetTimeouts() { + void resetTimeouts() { update(time.milliseconds()); sessionTimer.reset(rebalanceConfig.sessionTimeoutMs); pollTimer.reset(maxPollIntervalMs); heartbeatTimer.reset(rebalanceConfig.heartbeatIntervalMs); } - public void resetSessionTimeout() { + void resetSessionTimeout() { update(time.milliseconds()); sessionTimer.reset(rebalanceConfig.sessionTimeoutMs); } - public boolean pollTimeoutExpired(long now) { + boolean pollTimeoutExpired(long now) { update(now); return pollTimer.isExpired(); } - public long lastPollTime() { + long lastPollTime() { return pollTimer.currentTimeMs(); } - -} \ No newline at end of file +} diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java index e2315cd..eb88e3a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java @@ -466,6 +466,52 @@ public class AbstractCoordinatorTest { } @Test + public void testHeartbeatUnknownMemberResponseDuringRebalancing() throws InterruptedException { + setupCoordinator(); + mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + + final int generation = 1; + + mockClient.prepareResponse(joinGroupFollowerResponse(generation, memberId, JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.NONE)); + mockClient.prepareResponse(syncGroupResponse(Errors.NONE)); + + coordinator.ensureActiveGroup(); + + final AbstractCoordinator.Generation currGen = coordinator.generation(); + + // let the heartbeat request to send out a request + mockTime.sleep(HEARTBEAT_INTERVAL_MS); + + TestUtils.waitForCondition(() -> coordinator.heartbeat().hasInflight(), 2000, + "The heartbeat request was not sent in time after 2000ms elapsed"); + + assertTrue(coordinator.heartbeat().hasInflight()); + + // set the client to re-join group + mockClient.respond(heartbeatResponse(Errors.UNKNOWN_MEMBER_ID)); + + coordinator.requestRejoin(); + + TestUtils.waitForCondition(() -> { + coordinator.ensureActiveGroup(new MockTime(1L).timer(100L)); + return !coordinator.heartbeat().hasInflight(); + }, + 2000, + "The heartbeat response was not been received in time after 2000ms elapsed"); + + assertFalse(coordinator.heartbeat().hasInflight()); + + // the generation should be reset but the rebalance should still proceed + assertEquals(AbstractCoordinator.Generation.NO_GENERATION, coordinator.generation()); + + mockClient.respond(joinGroupFollowerResponse(generation, memberId, JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.NONE)); + mockClient.prepareResponse(syncGroupResponse(Errors.NONE)); + + coordinator.ensureActiveGroup(); + assertEquals(currGen, coordinator.generation()); + } + + @Test public void testHeartbeatRequestWithFencedInstanceIdException() throws InterruptedException { setupCoordinator(); mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala index e4525de..772d51d 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala @@ -202,41 +202,7 @@ class GroupCoordinator(val brokerId: Int, val newMemberId = group.generateMemberId(clientId, groupInstanceId) if (group.hasStaticMember(groupInstanceId)) { - val oldMemberId = group.getStaticMemberId(groupInstanceId) - info(s"Static member $groupInstanceId of group ${group.groupId} with unknown member id rejoins, assigning new member id $newMemberId, while " + - s"old member id $oldMemberId will be removed.") - - val currentLeader = group.leaderOrNull - val member = group.replaceGroupInstance(oldMemberId, newMemberId, groupInstanceId) - // Heartbeat of old member id will expire without effect since the group no longer contains that member id. - // New heartbeat shall be scheduled with new member id. - completeAndScheduleNextHeartbeatExpiration(group, member) - - val knownStaticMember = group.get(newMemberId) - group.updateMember(knownStaticMember, protocols, responseCallback) - - group.currentState match { - case Stable | CompletingRebalance => - info(s"Static member joins during ${group.currentState} stage will not trigger rebalance.") - group.maybeInvokeJoinCallback(member, JoinGroupResult( - members = List.empty, - memberId = newMemberId, - generationId = group.generationId, - protocolType = group.protocolType, - protocolName = group.protocolName, - // We want to avoid current leader performing trivial assignment while the group - // is in stable/awaiting sync stage, because the new assignment in leader's next sync call - // won't be broadcast by a stable/awaiting sync group. This could be guaranteed by - // always returning the old leader id so that the current leader won't assume itself - // as a leader based on the returned message, since the new member.id won't match - // returned leader id, therefore no assignment will be performed. - leaderId = currentLeader, - error = Errors.NONE)) - case Empty | Dead => - throw new IllegalStateException(s"Group ${group.groupId} was not supposed to be " + - s"in the state ${group.currentState} when the unknown static member $groupInstanceId rejoins.") - case PreparingRebalance => - } + updateStaticMemberAndRebalance(group, newMemberId, groupInstanceId, protocols, responseCallback) } else if (requireKnownMemberId) { // If member id required (dynamic membership), register the member in the pending member list // and send back a response to call for another join group request with allocated member id. @@ -246,7 +212,7 @@ class GroupCoordinator(val brokerId: Int, addPendingMemberExpiration(group, newMemberId, sessionTimeoutMs) responseCallback(JoinGroupResult(newMemberId, Errors.MEMBER_ID_REQUIRED)) } else { - debug(s"${if (groupInstanceId.isDefined) "Static" else "Dynamic"} Member with unknown member id joins group ${group.groupId} in " + + info(s"${if (groupInstanceId.isDefined) "Static" else "Dynamic"} Member with unknown member id joins group ${group.groupId} in " + s"${group.currentState} state. Created a new member id $newMemberId for this member and add to the group.") addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, newMemberId, groupInstanceId, clientId, clientHost, protocolType, protocols, group, responseCallback) @@ -287,7 +253,7 @@ class GroupCoordinator(val brokerId: Int, } } else { val groupInstanceIdNotFound = groupInstanceId.isDefined && !group.hasStaticMember(groupInstanceId) - if (group.isStaticMemberFenced(memberId, groupInstanceId)) { + if (group.isStaticMemberFenced(memberId, groupInstanceId, "join-group")) { // given member id doesn't match with the groupInstanceId. Inform duplicate instance to shut down immediately. responseCallback(JoinGroupResult(memberId, Errors.FENCED_INSTANCE_ID)) } else if (!group.has(memberId) || groupInstanceIdNotFound) { @@ -397,7 +363,7 @@ class GroupCoordinator(val brokerId: Int, // coordinator OR the group is in a transient unstable phase. Let the member retry // finding the correct coordinator and rejoin. responseCallback(SyncGroupResult(Errors.COORDINATOR_NOT_AVAILABLE)) - } else if (group.isStaticMemberFenced(memberId, groupInstanceId)) { + } else if (group.isStaticMemberFenced(memberId, groupInstanceId, "sync-group")) { responseCallback(SyncGroupResult(Errors.FENCED_INSTANCE_ID)) } else if (!group.has(memberId)) { responseCallback(SyncGroupResult(Errors.UNKNOWN_MEMBER_ID)) @@ -483,7 +449,7 @@ class GroupCoordinator(val brokerId: Int, val memberId = leavingMember.memberId val groupInstanceId = Option(leavingMember.groupInstanceId) if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID - && group.isStaticMemberFenced(memberId, groupInstanceId)) { + && group.isStaticMemberFenced(memberId, groupInstanceId, "leave-group")) { memberLeaveError(leavingMember, Errors.FENCED_INSTANCE_ID) } else if (group.isPendingMember(memberId)) { if (groupInstanceId.isDefined) { @@ -640,7 +606,7 @@ class GroupCoordinator(val brokerId: Int, // coordinator OR the group is in a transient unstable phase. Let the member retry // finding the correct coordinator and rejoin. responseCallback(Errors.COORDINATOR_NOT_AVAILABLE) - } else if (group.isStaticMemberFenced(memberId, groupInstanceId)) { + } else if (group.isStaticMemberFenced(memberId, groupInstanceId, "heartbeat")) { responseCallback(Errors.FENCED_INSTANCE_ID) } else if (!group.has(memberId)) { responseCallback(Errors.UNKNOWN_MEMBER_ID) @@ -739,7 +705,7 @@ class GroupCoordinator(val brokerId: Int, // coordinator OR the group is in a transient unstable phase. Let the member retry // finding the correct coordinator and rejoin. responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.COORDINATOR_NOT_AVAILABLE }) - } else if (group.isStaticMemberFenced(memberId, groupInstanceId)) { + } else if (group.isStaticMemberFenced(memberId, groupInstanceId, "txn-commit-offsets")) { responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.FENCED_INSTANCE_ID }) } else if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID && !group.has(memberId)) { // Enforce member id when it is set. @@ -766,7 +732,7 @@ class GroupCoordinator(val brokerId: Int, // coordinator OR the group is in a transient unstable phase. Let the member retry // finding the correct coordinator and rejoin. responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.COORDINATOR_NOT_AVAILABLE }) - } else if (group.isStaticMemberFenced(memberId, groupInstanceId)) { + } else if (group.isStaticMemberFenced(memberId, groupInstanceId, "commit-offsets")) { responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.FENCED_INSTANCE_ID }) } else if (generationId < 0 && group.is(Empty)) { // The group is only using Kafka to store offsets. @@ -1025,7 +991,55 @@ class GroupCoordinator(val brokerId: Int, } else { group.removePendingMember(memberId) } - maybePrepareRebalance(group, s"Adding new member $memberId with group instanceid $groupInstanceId") + maybePrepareRebalance(group, s"Adding new member $memberId with group instance id $groupInstanceId") + } + + private def updateStaticMemberAndRebalance(group: GroupMetadata, + newMemberId: String, + groupInstanceId: Option[String], + protocols: List[(String, Array[Byte])], + responseCallback: JoinCallback): Unit = { + val oldMemberId = group.getStaticMemberId(groupInstanceId) + info(s"Static member $groupInstanceId of group ${group.groupId} with unknown member id rejoins, assigning new member id $newMemberId, while " + + s"old member id $oldMemberId will be removed.") + + val currentLeader = group.leaderOrNull + val member = group.replaceGroupInstance(oldMemberId, newMemberId, groupInstanceId) + // Heartbeat of old member id will expire without effect since the group no longer contains that member id. + // New heartbeat shall be scheduled with new member id. + completeAndScheduleNextHeartbeatExpiration(group, member) + + val knownStaticMember = group.get(newMemberId) + group.updateMember(knownStaticMember, protocols, responseCallback) + + group.currentState match { + case Stable => + info(s"Static member joins during Stable stage will not trigger rebalance.") + group.maybeInvokeJoinCallback(member, JoinGroupResult( + members = List.empty, + memberId = newMemberId, + generationId = group.generationId, + protocolType = group.protocolType, + protocolName = group.protocolName, + // We want to avoid current leader performing trivial assignment while the group + // is in stable stage, because the new assignment in leader's next sync call + // won't be broadcast by a stable group. This could be guaranteed by + // always returning the old leader id so that the current leader won't assume itself + // as a leader based on the returned message, since the new member.id won't match + // returned leader id, therefore no assignment will be performed. + leaderId = currentLeader, + error = Errors.NONE)) + case CompletingRebalance => + // if the group is in after-sync stage, upon getting a new join-group of a known static member + // we should still trigger a new rebalance, since the old member may already be sent to the leader + // for assignment, and hence when the assignment gets back there would be a mismatch of the old member id + // with the new replaced member id. As a result the new member id would not get any assignment. + prepareRebalance(group, s"Updating metadata for static member ${member.memberId} with instance id $groupInstanceId") + case Empty | Dead => + throw new IllegalStateException(s"Group ${group.groupId} was not supposed to be " + + s"in the state ${group.currentState} when the unknown static member $groupInstanceId rejoins.") + case PreparingRebalance => + } } private def updateMemberAndRebalance(group: GroupMetadata, diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala index 44b33d3..049dbef 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala @@ -385,11 +385,13 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState * 2. group stored member.id doesn't match with given member.id */ def isStaticMemberFenced(memberId: String, - groupInstanceId: Option[String]): Boolean = { + groupInstanceId: Option[String], + operation: String): Boolean = { if (hasStaticMember(groupInstanceId) && getStaticMemberId(groupInstanceId) != memberId) { - error(s"given member.id $memberId is identified as a known static member ${groupInstanceId.get}," + - s"but not matching the expected member.id ${getStaticMemberId(groupInstanceId)}") + error(s"given member.id $memberId is identified as a known static member ${groupInstanceId.get}, " + + s"but not matching the expected member.id ${getStaticMemberId(groupInstanceId)} during $operation, will " + + s"respond with instance fenced error") true } else false diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala index 0b3390e..5c7a52a 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala @@ -173,7 +173,7 @@ class PlaintextConsumerTest extends BaseConsumerTest { @Test def testMaxPollIntervalMs(): Unit = { - this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 3000.toString) + this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 1000.toString) this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 500.toString) this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 2000.toString) @@ -187,9 +187,10 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(1, listener.callsToAssigned) assertEquals(0, listener.callsToRevoked) - Thread.sleep(3500) + // after we extend longer than max.poll a rebalance should be triggered + // NOTE we need to have a relatively much larger value than max.poll to let heartbeat expired for sure + Thread.sleep(3000) - // we should fall out of the group and need to rebalance awaitRebalance(consumer, listener) assertEquals(2, listener.callsToAssigned) assertEquals(1, listener.callsToRevoked) diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala index 55f0d1e..ed5ea0d 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala @@ -616,10 +616,10 @@ class GroupCoordinatorTest { groupId, CompletingRebalance, Some(protocolType)) - assertEquals(leaderJoinGroupResult.leaderId, leaderJoinGroupResult.memberId) + assertEquals(rebalanceResult.leaderId, leaderJoinGroupResult.memberId) assertEquals(rebalanceResult.leaderId, leaderJoinGroupResult.leaderId) - // Old member shall be getting a successful join group response. + // Old follower shall be getting a successful join group response. val oldFollowerJoinGroupResult = Await.result(oldFollowerJoinGroupFuture, Duration(1, TimeUnit.MILLISECONDS)) checkJoinGroupResult(oldFollowerJoinGroupResult, Errors.NONE, @@ -629,31 +629,37 @@ class GroupCoordinatorTest { CompletingRebalance, Some(protocolType), expectedLeaderId = leaderJoinGroupResult.memberId) + assertEquals(rebalanceResult.followerId, oldFollowerJoinGroupResult.memberId) + assertEquals(rebalanceResult.leaderId, oldFollowerJoinGroupResult.leaderId) + assertTrue(getGroup(groupId).is(CompletingRebalance)) + // Duplicate follower joins group with unknown member id will trigger member.id replacement, + // and will also trigger a rebalance under CompletingRebalance state; the old follower sync callback + // will return fenced exception while broker replaces the member identity with the duplicate follower joins. EasyMock.reset(replicaManager) val oldFollowerSyncGroupFuture = sendSyncGroupFollower(groupId, oldFollowerJoinGroupResult.generationId, oldFollowerJoinGroupResult.memberId, Some(protocolType), Some(protocolName), followerInstanceId) - - // Duplicate follower joins group with unknown member id will trigger member.id replacement. EasyMock.reset(replicaManager) val duplicateFollowerJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, groupInstanceId = followerInstanceId) timer.advanceClock(1) - - // Old follower sync callback will return fenced exception while broker replaces the member identity. val oldFollowerSyncGroupResult = Await.result(oldFollowerSyncGroupFuture, Duration(1, TimeUnit.MILLISECONDS)) assertEquals(Errors.FENCED_INSTANCE_ID, oldFollowerSyncGroupResult.error) + assertTrue(getGroup(groupId).is(PreparingRebalance)) + + timer.advanceClock(GroupInitialRebalanceDelay + 1) + timer.advanceClock(DefaultRebalanceTimeout + 1) - // Duplicate follower will get the same response as old follower. val duplicateFollowerJoinGroupResult = Await.result(duplicateFollowerJoinFuture, Duration(1, TimeUnit.MILLISECONDS)) checkJoinGroupResult(duplicateFollowerJoinGroupResult, Errors.NONE, - rebalanceResult.generation + 1, - Set.empty, + rebalanceResult.generation + 2, + Set(followerInstanceId), // this follower will become the new leader, and hence it would have the member list groupId, CompletingRebalance, Some(protocolType), - expectedLeaderId = leaderJoinGroupResult.memberId) + expectedLeaderId = duplicateFollowerJoinGroupResult.memberId) + assertTrue(getGroup(groupId).is(CompletingRebalance)) } @Test