This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new c75dc5e KAFKA-9701 (fix): Only check protocol name when generation is valid (#8324) c75dc5e is described below commit c75dc5e2e09cee4ba47a9b6c484cb225acdb086e Author: Boyang Chen <boy...@confluent.io> AuthorDate: Fri Mar 20 21:26:57 2020 -0700 KAFKA-9701 (fix): Only check protocol name when generation is valid (#8324) This bug was incurred by #7994 with a too-strong consistency check. It is because a reset generation operation could be called in between the joinGroupRequest -> joinGroupResponse -> SyncGroupRequest -> SyncGroupResponse sequence of events, if user calls unsubscribe in the middle of consumer#poll(). Proper fix is to avoid the protocol name check when the generation is invalid. Reviewers: Matthias J. Sax <matth...@confluent.io>, Guozhang Wang <wangg...@gmail.com> --- .../consumer/internals/AbstractCoordinator.java | 3 +- .../internals/AbstractCoordinatorTest.java | 66 +++++++++++++++------- 2 files changed, 49 insertions(+), 20 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 2d93766..8427b92 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -913,7 +913,8 @@ public abstract class AbstractCoordinator implements Closeable { } private boolean isProtocolNameInconsistent(String protocolName) { - return protocolName != null && !protocolName.equals(generation().protocolName); + return protocolName != null && generation() != Generation.NO_GENERATION + && !protocolName.equals(generation().protocolName); } /** diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java index f958836..e2315cd 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java @@ -49,7 +49,6 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Timer; -import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; import org.junit.Test; @@ -365,8 +364,8 @@ public class AbstractCoordinatorTest { @Test public void testJoinGroupProtocolTypeAndName() { - String wrongProtocolType = "wrong-type"; - String wrongProtocolName = "wrong-name"; + final String wrongProtocolType = "wrong-type"; + final String wrongProtocolName = "wrong-name"; // No Protocol Type in both JoinGroup and SyncGroup responses assertTrue(joinGroupWithProtocolTypeAndName(null, null, null)); @@ -391,6 +390,39 @@ public class AbstractCoordinatorTest { () -> joinGroupWithProtocolTypeAndName(PROTOCOL_TYPE, PROTOCOL_TYPE, wrongProtocolName)); } + @Test + public void testNoGenerationWillNotTriggerProtocolNameCheck() { + final String wrongProtocolName = "wrong-name"; + + setupCoordinator(); + mockClient.reset(); + mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(mockTime.timer(0)); + + mockClient.prepareResponse(body -> { + if (!(body instanceof JoinGroupRequest)) { + return false; + } + JoinGroupRequest joinGroupRequest = (JoinGroupRequest) body; + return joinGroupRequest.data().protocolType().equals(PROTOCOL_TYPE); + }, joinGroupFollowerResponse(defaultGeneration, memberId, + "memberid", Errors.NONE, PROTOCOL_TYPE)); + + mockClient.prepareResponse(body -> { + if (!(body instanceof SyncGroupRequest)) { + return false; + } + coordinator.resetGenerationOnLeaveGroup(); + + SyncGroupRequest syncGroupRequest = (SyncGroupRequest) body; + return syncGroupRequest.data.protocolType().equals(PROTOCOL_TYPE) + && syncGroupRequest.data.protocolName().equals(PROTOCOL_NAME); + }, syncGroupResponse(Errors.NONE, PROTOCOL_TYPE, wrongProtocolName)); + + // No exception shall be thrown as the generation is reset. + coordinator.joinGroupIfNeeded(mockTime.timer(100L)); + } + private boolean joinGroupWithProtocolTypeAndName(String joinGroupResponseProtocolType, String syncGroupResponseProtocolType, String syncGroupResponseProtocolName) { @@ -665,7 +697,7 @@ public class AbstractCoordinatorTest { try { coordinator.ensureActiveGroup(); fail("Should have woken up from ensureActiveGroup()"); - } catch (WakeupException e) { + } catch (WakeupException ignored) { } assertEquals(1, coordinator.onJoinPrepareInvokes); @@ -703,7 +735,7 @@ public class AbstractCoordinatorTest { try { coordinator.ensureActiveGroup(); fail("Should have woken up from ensureActiveGroup()"); - } catch (WakeupException e) { + } catch (WakeupException ignored) { } assertEquals(1, coordinator.onJoinPrepareInvokes); @@ -738,7 +770,7 @@ public class AbstractCoordinatorTest { try { coordinator.ensureActiveGroup(); fail("Should have woken up from ensureActiveGroup()"); - } catch (WakeupException e) { + } catch (WakeupException ignored) { } assertEquals(1, coordinator.onJoinPrepareInvokes); @@ -811,7 +843,7 @@ public class AbstractCoordinatorTest { try { coordinator.ensureActiveGroup(); fail("Should have woken up from ensureActiveGroup()"); - } catch (WakeupException e) { + } catch (WakeupException ignored) { } assertEquals(1, coordinator.onJoinPrepareInvokes); @@ -884,7 +916,7 @@ public class AbstractCoordinatorTest { try { coordinator.ensureActiveGroup(); fail("Should have woken up from ensureActiveGroup()"); - } catch (WakeupException e) { + } catch (WakeupException ignored) { } assertEquals(1, coordinator.onJoinPrepareInvokes); @@ -945,7 +977,7 @@ public class AbstractCoordinatorTest { try { coordinator.ensureActiveGroup(); fail("Should have woken up from ensureActiveGroup()"); - } catch (WakeupException e) { + } catch (WakeupException ignored) { } assertEquals(1, coordinator.onJoinPrepareInvokes); @@ -990,12 +1022,8 @@ public class AbstractCoordinatorTest { private void awaitFirstHeartbeat(final AtomicBoolean heartbeatReceived) throws Exception { mockTime.sleep(HEARTBEAT_INTERVAL_MS); - TestUtils.waitForCondition(new TestCondition() { - @Override - public boolean conditionMet() { - return heartbeatReceived.get(); - } - }, 3000, "Should have received a heartbeat request after joining the group"); + TestUtils.waitForCondition(heartbeatReceived::get, + 3000, "Should have received a heartbeat request after joining the group"); } private FindCoordinatorResponse groupCoordinatorResponse(Node node, Errors error) { @@ -1063,10 +1091,10 @@ public class AbstractCoordinatorTest { private int onJoinCompleteInvokes = 0; private boolean wakeupOnJoinComplete = false; - public DummyCoordinator(GroupRebalanceConfig rebalanceConfig, - ConsumerNetworkClient client, - Metrics metrics, - Time time) { + DummyCoordinator(GroupRebalanceConfig rebalanceConfig, + ConsumerNetworkClient client, + Metrics metrics, + Time time) { super(rebalanceConfig, new LogContext(), client, metrics, METRIC_GROUP_PREFIX, time); }