Repository: kafka Updated Branches: refs/heads/trunk eb59c8124 -> 54767bbba
KAFKA-4033; Revise partition assignment semantics on consumer subscription changes (KIP-70) This PR changes topic subscription semantics so a change in subscription does not immediately cause a rebalance. Instead, the next poll or the next scheduled metadata refresh will update the assigned partitions. Author: Vahid Hashemian <vahidhashem...@us.ibm.com> Reviewers: Jason Gustafson Closes #1726 from vahidhashemian/KAFKA-4033 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/54767bbb Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/54767bbb Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/54767bbb Branch: refs/heads/trunk Commit: 54767bbba5bf18c01f50bb40c339433bfed09627 Parents: eb59c81 Author: Vahid Hashemian <vahidhashem...@us.ibm.com> Authored: Thu Sep 8 19:56:53 2016 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Thu Sep 8 19:56:53 2016 -0700 ---------------------------------------------------------------------- .../kafka/clients/consumer/KafkaConsumer.java | 13 +- .../consumer/internals/ConsumerCoordinator.java | 36 +- .../consumer/internals/SubscriptionState.java | 9 - .../clients/consumer/KafkaConsumerTest.java | 653 ++++++++++++++----- .../internals/SubscriptionStateTest.java | 63 +- 5 files changed, 568 insertions(+), 206 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/54767bbb/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 85d5194..ca8f1f1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -552,7 +552,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * A consumer is instantiated by providing a {@link java.util.Properties} object as configuration. * <p> * Valid configuration strings are documented at {@link ConsumerConfig} - * + * * @param properties The consumer configuration properties */ public KafkaConsumer(Properties properties) { @@ -706,9 +706,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { Metrics metrics, SubscriptionState subscriptions, Metadata metadata, - boolean autoCommitEnabled, - int autoCommitIntervalMs, - int heartbeatIntervalMs, long retryBackoffMs, long requestTimeoutMs) { this.clientId = clientId; @@ -872,6 +869,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { public void unsubscribe() { acquire(); try { + // make sure the offsets of topic partitions the consumer is unsubscribing from + // are committed since there will be no following rebalance + this.coordinator.maybeAutoCommitOffsetsNow(); + log.debug("Unsubscribed all topics or patterns and assigned partitions"); this.subscriptions.unsubscribe(); this.coordinator.maybeLeaveGroup(); @@ -913,6 +914,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { topics.add(topic); } + // make sure the offsets of topic partitions the consumer is unsubscribing from + // are committed since there will be no following rebalance + this.coordinator.maybeAutoCommitOffsetsNow(); + log.debug("Subscribed to partition(s): {}", Utils.join(partitions, ", ")); this.subscriptions.assignFromUser(new HashSet<>(partitions)); metadata.setTopics(topics); http://git-wip-us.apache.org/repos/asf/kafka/blob/54767bbb/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index b8df50e..ff0d669 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -514,22 +514,31 @@ public final class ConsumerCoordinator extends AbstractCoordinator { this.nextAutoCommitDeadline = now + retryBackoffMs; } else if (now >= nextAutoCommitDeadline) { this.nextAutoCommitDeadline = now + autoCommitIntervalMs; - commitOffsetsAsync(subscriptions.allConsumed(), new OffsetCommitCallback() { - @Override - public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { - if (exception != null) { - log.warn("Auto offset commit failed for group {}: {}", groupId, exception.getMessage()); - if (exception instanceof RetriableException) - nextAutoCommitDeadline = Math.min(time.milliseconds() + retryBackoffMs, nextAutoCommitDeadline); - } else { - log.debug("Completed autocommit of offsets {} for group {}", offsets, groupId); - } - } - }); + doAutoCommitOffsetsAsync(); } } } + public void maybeAutoCommitOffsetsNow() { + if (autoCommitEnabled && !coordinatorUnknown()) + doAutoCommitOffsetsAsync(); + } + + private void doAutoCommitOffsetsAsync() { + commitOffsetsAsync(subscriptions.allConsumed(), new OffsetCommitCallback() { + @Override + public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { + if (exception != null) { + log.warn("Auto offset commit failed for group {}: {}", groupId, exception.getMessage()); + if (exception instanceof RetriableException) + nextAutoCommitDeadline = Math.min(time.milliseconds() + retryBackoffMs, nextAutoCommitDeadline); + } else { + log.debug("Completed autocommit of offsets {} for group {}", offsets, groupId); + } + } + }); + } + private void maybeAutoCommitOffsetsSync() { if (autoCommitEnabled) { try { @@ -807,7 +816,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator { } public void invoke() { - callback.onComplete(offsets, exception); + if (callback != null) + callback.onComplete(offsets, exception); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/54767bbb/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index 6d4c01b..dd0bce9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -21,7 +21,6 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.regex.Pattern; @@ -128,13 +127,6 @@ public class SubscriptionState { if (!this.subscription.equals(topicsToSubscribe)) { this.subscription = topicsToSubscribe; this.groupSubscription.addAll(topicsToSubscribe); - - // Remove any assigned partitions which are no longer subscribed to - for (Iterator<TopicPartition> it = assignment.keySet().iterator(); it.hasNext(); ) { - TopicPartition tp = it.next(); - if (!subscription.contains(tp.topic())) - it.remove(); - } } } @@ -216,7 +208,6 @@ public class SubscriptionState { this.subscriptionType = SubscriptionType.NONE; } - public Pattern getSubscribedPattern() { return this.subscribedPattern; } http://git-wip-us.apache.org/repos/asf/kafka/blob/54767bbb/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 8d2ac00..dbe3d67 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -35,11 +35,13 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.network.Selectable; +import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.requests.FetchResponse; +import org.apache.kafka.common.requests.FetchResponse.PartitionData; import org.apache.kafka.common.requests.GroupCoordinatorResponse; import org.apache.kafka.common.requests.HeartbeatResponse; import org.apache.kafka.common.requests.JoinGroupResponse; @@ -76,9 +78,15 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; public class KafkaConsumerTest { - private final String topic = "test"; - private final TopicPartition tp0 = new TopicPartition("test", 0); + private final TopicPartition tp0 = new TopicPartition(topic, 0); + private final TopicPartition tp1 = new TopicPartition(topic, 1); + + private final String topic2 = "test2"; + private final TopicPartition t2p0 = new TopicPartition(topic2, 0); + + private final String topic3 = "test3"; + private final TopicPartition t3p0 = new TopicPartition(topic3, 0); @Test public void testConstructorClose() throws Exception { @@ -318,9 +326,6 @@ public class KafkaConsumerTest { @Test public void verifyHeartbeatSent() throws Exception { - String topic = "topic"; - TopicPartition partition = new TopicPartition(topic, 0); - int rebalanceTimeoutMs = 60000; int sessionTimeoutMs = 30000; int heartbeatIntervalMs = 1000; @@ -336,47 +341,18 @@ public class KafkaConsumerTest { PartitionAssignor assignor = new RoundRobinAssignor(); final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, - rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, autoCommitIntervalMs); + rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs); - consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() { - @Override - public void onPartitionsRevoked(Collection<TopicPartition> partitions) { - - } - - @Override - public void onPartitionsAssigned(Collection<TopicPartition> partitions) { - // set initial position so we don't need a lookup - for (TopicPartition partition : partitions) - consumer.seek(partition, 0); - } - }); - - // lookup coordinator - client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE.code(), node).toStruct(), node); - - Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port()); - - // join group - client.prepareResponseFrom(joinGroupFollowerResponse(assignor, 1, "memberId", "leaderId", Errors.NONE.code()), coordinator); - - // sync group - client.prepareResponseFrom(syncGroupResponse(Arrays.asList(partition), Errors.NONE.code()), coordinator); + consumer.subscribe(Arrays.asList(topic), getConsumerRebalanceListener(consumer)); + Node coordinator = prepareRebalance(client, node, assignor, Arrays.asList(tp0), null); // initial fetch - client.prepareResponseFrom(fetchResponse(partition, 0, 0), node); + client.prepareResponseFrom(fetchResponse(tp0, 0, 0), node); consumer.poll(0); - assertEquals(Collections.singleton(partition), consumer.assignment()); + assertEquals(Collections.singleton(tp0), consumer.assignment()); - final AtomicBoolean heartbeatReceived = new AtomicBoolean(false); - client.prepareResponseFrom(new MockClient.RequestMatcher() { - @Override - public boolean matches(ClientRequest request) { - heartbeatReceived.set(true); - return true; - } - }, new HeartbeatResponse(Errors.NONE.code()).toStruct(), coordinator); + AtomicBoolean heartbeatReceived = prepareHeartbeatResponse(client, coordinator); // heartbeat interval is 2 seconds time.sleep(heartbeatIntervalMs); @@ -389,9 +365,6 @@ public class KafkaConsumerTest { @Test public void verifyHeartbeatSentWhenFetchedDataReady() throws Exception { - String topic = "topic"; - TopicPartition partition = new TopicPartition(topic, 0); - int rebalanceTimeoutMs = 60000; int sessionTimeoutMs = 30000; int heartbeatIntervalMs = 1000; @@ -407,47 +380,19 @@ public class KafkaConsumerTest { PartitionAssignor assignor = new RoundRobinAssignor(); final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, - rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, autoCommitIntervalMs); - consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() { - @Override - public void onPartitionsRevoked(Collection<TopicPartition> partitions) { + rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs); - } - - @Override - public void onPartitionsAssigned(Collection<TopicPartition> partitions) { - // set initial position so we don't need a lookup - for (TopicPartition partition : partitions) - consumer.seek(partition, 0); - } - }); - - // lookup coordinator - client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE.code(), node).toStruct(), node); - - Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port()); - - // join group - client.prepareResponseFrom(joinGroupFollowerResponse(assignor, 1, "memberId", "leaderId", Errors.NONE.code()), coordinator); - - // sync group - client.prepareResponseFrom(syncGroupResponse(Arrays.asList(partition), Errors.NONE.code()), coordinator); + consumer.subscribe(Arrays.asList(topic), getConsumerRebalanceListener(consumer)); + Node coordinator = prepareRebalance(client, node, assignor, Arrays.asList(tp0), null); consumer.poll(0); // respond to the outstanding fetch so that we have data available on the next poll - client.respondFrom(fetchResponse(partition, 0, 5), node); + client.respondFrom(fetchResponse(tp0, 0, 5), node); client.poll(0, time.milliseconds()); - client.prepareResponseFrom(fetchResponse(partition, 5, 0), node); - final AtomicBoolean heartbeatReceived = new AtomicBoolean(false); - client.prepareResponseFrom(new MockClient.RequestMatcher() { - @Override - public boolean matches(ClientRequest request) { - heartbeatReceived.set(true); - return true; - } - }, new HeartbeatResponse(Errors.NONE.code()).toStruct(), coordinator); + client.prepareResponseFrom(fetchResponse(tp0, 5, 0), node); + AtomicBoolean heartbeatReceived = prepareHeartbeatResponse(client, coordinator); time.sleep(heartbeatIntervalMs); Thread.sleep(heartbeatIntervalMs); @@ -459,8 +404,6 @@ public class KafkaConsumerTest { @Test public void verifyNoCoordinatorLookupForManualAssignmentWithSeek() { - String topic = "topic"; - final TopicPartition partition = new TopicPartition(topic, 0); int rebalanceTimeoutMs = 60000; int sessionTimeoutMs = 3000; int heartbeatIntervalMs = 2000; @@ -476,26 +419,22 @@ public class KafkaConsumerTest { PartitionAssignor assignor = new RoundRobinAssignor(); final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, - rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, autoCommitIntervalMs); - consumer.assign(Arrays.asList(partition)); - consumer.seekToBeginning(Arrays.asList(partition)); + rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs); + consumer.assign(Arrays.asList(tp0)); + consumer.seekToBeginning(Arrays.asList(tp0)); // there shouldn't be any need to lookup the coordinator or fetch committed offsets. // we just lookup the starting position and send the record fetch. - client.prepareResponse(listOffsetsResponse(Collections.singletonMap(partition, 50L), Errors.NONE.code())); - client.prepareResponse(fetchResponse(partition, 50L, 5)); + client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp0, 50L), Errors.NONE.code())); + client.prepareResponse(fetchResponse(tp0, 50L, 5)); ConsumerRecords<String, String> records = consumer.poll(0); assertEquals(5, records.count()); - assertEquals(55L, consumer.position(partition)); + assertEquals(55L, consumer.position(tp0)); } @Test public void testCommitsFetchedDuringAssign() { - String topic = "topic"; - final TopicPartition partition1 = new TopicPartition(topic, 0); - final TopicPartition partition2 = new TopicPartition(topic, 1); - long offset1 = 10000; long offset2 = 20000; @@ -514,8 +453,8 @@ public class KafkaConsumerTest { PartitionAssignor assignor = new RoundRobinAssignor(); final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, - rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, autoCommitIntervalMs); - consumer.assign(Arrays.asList(partition1)); + rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs); + consumer.assign(Arrays.asList(tp0)); // lookup coordinator client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE.code(), node).toStruct(), node); @@ -523,28 +462,25 @@ public class KafkaConsumerTest { // fetch offset for one topic client.prepareResponseFrom( - offsetResponse(Collections.singletonMap(partition1, offset1), Errors.NONE.code()), + offsetResponse(Collections.singletonMap(tp0, offset1), Errors.NONE.code()), coordinator); - assertEquals(offset1, consumer.committed(partition1).offset()); + assertEquals(offset1, consumer.committed(tp0).offset()); - consumer.assign(Arrays.asList(partition1, partition2)); + consumer.assign(Arrays.asList(tp0, tp1)); // fetch offset for two topics Map<TopicPartition, Long> offsets = new HashMap<>(); - offsets.put(partition1, offset1); - offsets.put(partition2, offset2); + offsets.put(tp0, offset1); + offsets.put(tp1, offset2); client.prepareResponseFrom(offsetResponse(offsets, Errors.NONE.code()), coordinator); - assertEquals(offset1, consumer.committed(partition1).offset()); - assertEquals(offset2, consumer.committed(partition2).offset()); + assertEquals(offset1, consumer.committed(tp0).offset()); + assertEquals(offset2, consumer.committed(tp1).offset()); } @Test public void testAutoCommitSentBeforePositionUpdate() { - String topic = "topic"; - final TopicPartition partition = new TopicPartition(topic, 0); - int rebalanceTimeoutMs = 60000; int sessionTimeoutMs = 30000; int heartbeatIntervalMs = 3000; @@ -563,56 +499,23 @@ public class KafkaConsumerTest { PartitionAssignor assignor = new RoundRobinAssignor(); final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, - rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, autoCommitIntervalMs); - consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() { - @Override - public void onPartitionsRevoked(Collection<TopicPartition> partitions) { - - } - - @Override - public void onPartitionsAssigned(Collection<TopicPartition> partitions) { - // set initial position so we don't need a lookup - for (TopicPartition partition : partitions) - consumer.seek(partition, 0); - } - }); - - // lookup coordinator - client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE.code(), node).toStruct(), node); - - Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port()); - - // join group - client.prepareResponseFrom(joinGroupFollowerResponse(assignor, 1, "memberId", "leaderId", Errors.NONE.code()), coordinator); + rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs); - // sync group - client.prepareResponseFrom(syncGroupResponse(Arrays.asList(partition), Errors.NONE.code()), coordinator); + consumer.subscribe(Arrays.asList(topic), getConsumerRebalanceListener(consumer)); + Node coordinator = prepareRebalance(client, node, assignor, Arrays.asList(tp0), null); consumer.poll(0); // respond to the outstanding fetch so that we have data available on the next poll - client.respondFrom(fetchResponse(partition, 0, 5), node); + client.respondFrom(fetchResponse(tp0, 0, 5), node); client.poll(0, time.milliseconds()); time.sleep(autoCommitIntervalMs); - client.prepareResponseFrom(fetchResponse(partition, 5, 0), node); + client.prepareResponseFrom(fetchResponse(tp0, 5, 0), node); // no data has been returned to the user yet, so the committed offset should be 0 - final AtomicBoolean commitReceived = new AtomicBoolean(false); - client.prepareResponseFrom(new MockClient.RequestMatcher() { - @Override - public boolean matches(ClientRequest request) { - OffsetCommitRequest commitRequest = new OffsetCommitRequest(request.request().body()); - OffsetCommitRequest.PartitionData partitionData = commitRequest.offsetData().get(partition); - if (partitionData.offset == 0) { - commitReceived.set(true); - return true; - } - return false; - } - }, new OffsetCommitResponse(Collections.singletonMap(partition, Errors.NONE.code())).toStruct(), coordinator); + AtomicBoolean commitReceived = prepareOffsetCommitResponse(client, coordinator, tp0, 0); consumer.poll(0); @@ -621,9 +524,6 @@ public class KafkaConsumerTest { @Test public void testWakeupWithFetchDataAvailable() { - String topic = "topic"; - final TopicPartition partition = new TopicPartition(topic, 0); - int rebalanceTimeoutMs = 60000; int sessionTimeoutMs = 30000; int heartbeatIntervalMs = 3000; @@ -642,8 +542,360 @@ public class KafkaConsumerTest { PartitionAssignor assignor = new RoundRobinAssignor(); final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, - rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, autoCommitIntervalMs); - consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() { + rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs); + + consumer.subscribe(Arrays.asList(topic), getConsumerRebalanceListener(consumer)); + prepareRebalance(client, node, assignor, Arrays.asList(tp0), null); + + consumer.poll(0); + + // respond to the outstanding fetch so that we have data available on the next poll + client.respondFrom(fetchResponse(tp0, 0, 5), node); + client.poll(0, time.milliseconds()); + + consumer.wakeup(); + + try { + consumer.poll(0); + fail(); + } catch (WakeupException e) { + } + + // make sure the position hasn't been updated + assertEquals(0, consumer.position(tp0)); + + // the next poll should return the completed fetch + ConsumerRecords<String, String> records = consumer.poll(0); + assertEquals(5, records.count()); + } + + /** + * Verify that when a consumer changes its topic subscription its assigned partitions + * do not immediately change, and the latest consumed offsets of its to-be-revoked + * partitions are properly committed (when auto-commit is enabled). + * Upon unsubscribing from subscribed topics the consumer subscription and assignment + * are both updated right away and its consumed offsets are committed (if auto-commit + * is enabled). + */ + @Test + public void testSubscriptionChangesWithAutoCommitEnabled() { + int rebalanceTimeoutMs = 60000; + int sessionTimeoutMs = 30000; + int heartbeatIntervalMs = 3000; + + // adjust auto commit interval lower than heartbeat so we don't need to deal with + // a concurrent heartbeat request + int autoCommitIntervalMs = 1000; + + Time time = new MockTime(); + MockClient client = new MockClient(time); + Map<String, Integer> tpCounts = new HashMap<>(); + tpCounts.put(topic, 1); + tpCounts.put(topic2, 1); + tpCounts.put(topic3, 1); + Cluster cluster = TestUtils.singletonCluster(tpCounts); + Node node = cluster.nodes().get(0); + client.setNode(node); + Metadata metadata = new Metadata(0, Long.MAX_VALUE); + metadata.update(cluster, time.milliseconds()); + PartitionAssignor assignor = new RangeAssignor(); + + final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, + rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs); + + // initial subscription + consumer.subscribe(Arrays.asList(topic, topic2), getConsumerRebalanceListener(consumer)); + + // verify that subscription has changed but assignment is still unchanged + assertTrue(consumer.subscription().size() == 2); + assertTrue(consumer.subscription().contains(topic) && consumer.subscription().contains(topic2)); + assertTrue(consumer.assignment().isEmpty()); + + // mock rebalance responses + Node coordinator = prepareRebalance(client, node, assignor, Arrays.asList(tp0, t2p0), null); + + consumer.poll(0); + + // verify that subscription is still the same, and now assignment has caught up + assertTrue(consumer.subscription().size() == 2); + assertTrue(consumer.subscription().contains(topic) && consumer.subscription().contains(topic2)); + assertTrue(consumer.assignment().size() == 2); + assertTrue(consumer.assignment().contains(tp0) && consumer.assignment().contains(t2p0)); + + // mock a response to the outstanding fetch so that we have data available on the next poll + Map<TopicPartition, FetchInfo> fetches1 = new HashMap<>(); + fetches1.put(tp0, new FetchInfo(0, 1)); + fetches1.put(t2p0, new FetchInfo(0, 10)); + client.respondFrom(fetchResponse(fetches1), node); + client.poll(0, time.milliseconds()); + + ConsumerRecords<String, String> records = consumer.poll(0); + + // verify that the fetch occurred as expected + assertEquals(11, records.count()); + assertEquals(1L, consumer.position(tp0)); + assertEquals(10L, consumer.position(t2p0)); + + + // subscription change + consumer.subscribe(Arrays.asList(topic, topic3), getConsumerRebalanceListener(consumer)); + + // verify that subscription has changed but assignment is still unchanged + assertTrue(consumer.subscription().size() == 2); + assertTrue(consumer.subscription().contains(topic) && consumer.subscription().contains(topic3)); + assertTrue(consumer.assignment().size() == 2); + assertTrue(consumer.assignment().contains(tp0) && consumer.assignment().contains(t2p0)); + + // mock the offset commit response for to be revoked partitions + Map<TopicPartition, Long> partitionOffsets1 = new HashMap<>(); + partitionOffsets1.put(tp0, 1L); + partitionOffsets1.put(t2p0, 10L); + AtomicBoolean commitReceived = prepareOffsetCommitResponse(client, coordinator, partitionOffsets1); + + // mock rebalance responses + prepareRebalance(client, node, assignor, Arrays.asList(tp0, t3p0), coordinator); + + // mock a response to the outstanding fetch so that we have data available on the next poll + Map<TopicPartition, FetchInfo> fetches2 = new HashMap<>(); + fetches2.put(tp0, new FetchInfo(1, 1)); + fetches2.put(t3p0, new FetchInfo(0, 100)); + client.respondFrom(fetchResponse(fetches2), node); + client.poll(0, time.milliseconds()); + client.prepareResponse(fetchResponse(fetches2)); + + records = consumer.poll(0); + + // verify that the fetch occurred as expected + assertEquals(101, records.count()); + assertEquals(2L, consumer.position(tp0)); + assertEquals(100L, consumer.position(t3p0)); + + // verify that the offset commits occurred as expected + assertTrue(commitReceived.get()); + + // verify that subscription is still the same, and now assignment has caught up + assertTrue(consumer.subscription().size() == 2); + assertTrue(consumer.subscription().contains(topic) && consumer.subscription().contains(topic3)); + assertTrue(consumer.assignment().size() == 2); + assertTrue(consumer.assignment().contains(tp0) && consumer.assignment().contains(t3p0)); + + + // mock the offset commit response for to be revoked partitions + Map<TopicPartition, Long> partitionOffsets2 = new HashMap<>(); + partitionOffsets2.put(tp0, 2L); + partitionOffsets2.put(t3p0, 100L); + commitReceived = prepareOffsetCommitResponse(client, coordinator, partitionOffsets2); + + // unsubscribe + consumer.unsubscribe(); + + // verify that subscription and assignment are both cleared + assertTrue(consumer.subscription().isEmpty()); + assertTrue(consumer.assignment().isEmpty()); + + // verify that the offset commits occurred as expected + assertTrue(commitReceived.get()); + + consumer.close(); + } + + /** + * Verify that when a consumer changes its topic subscription its assigned partitions + * do not immediately change, and the consumed offsets of its to-be-revoked partitions + * are not committed (when auto-commit is disabled). + * Upon unsubscribing from subscribed topics, the assigned partitions immediately + * change but if auto-commit is disabled the consumer offsets are not committed. + */ + @Test + public void testSubscriptionChangesWithAutoCommitDisabled() { + int rebalanceTimeoutMs = 60000; + int sessionTimeoutMs = 30000; + int heartbeatIntervalMs = 3000; + int autoCommitIntervalMs = 1000; + + Time time = new MockTime(); + MockClient client = new MockClient(time); + Map<String, Integer> tpCounts = new HashMap<>(); + tpCounts.put(topic, 1); + tpCounts.put(topic2, 1); + Cluster cluster = TestUtils.singletonCluster(tpCounts); + Node node = cluster.nodes().get(0); + client.setNode(node); + Metadata metadata = new Metadata(0, Long.MAX_VALUE); + metadata.update(cluster, time.milliseconds()); + PartitionAssignor assignor = new RangeAssignor(); + + final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, + rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, false, autoCommitIntervalMs); + + // initial subscription + consumer.subscribe(Arrays.asList(topic), getConsumerRebalanceListener(consumer)); + + // verify that subscription has changed but assignment is still unchanged + assertTrue(consumer.subscription().equals(Collections.singleton(topic))); + assertTrue(consumer.assignment().isEmpty()); + + // mock rebalance responses + prepareRebalance(client, node, assignor, Arrays.asList(tp0), null); + + consumer.poll(0); + + // verify that subscription is still the same, and now assignment has caught up + assertTrue(consumer.subscription().equals(Collections.singleton(topic))); + assertTrue(consumer.assignment().equals(Collections.singleton(tp0))); + + consumer.poll(0); + + // subscription change + consumer.subscribe(Arrays.asList(topic2), getConsumerRebalanceListener(consumer)); + + // verify that subscription has changed but assignment is still unchanged + assertTrue(consumer.subscription().equals(Collections.singleton(topic2))); + assertTrue(consumer.assignment().equals(Collections.singleton(tp0))); + + // the auto commit is disabled, so no offset commit request should be sent + for (ClientRequest req: client.requests()) + assertTrue(req.request().header().apiKey() != ApiKeys.OFFSET_COMMIT.id); + + // subscription change + consumer.unsubscribe(); + + // verify that subscription and assignment are both updated + assertTrue(consumer.subscription().isEmpty()); + assertTrue(consumer.assignment().isEmpty()); + + // the auto commit is disabled, so no offset commit request should be sent + for (ClientRequest req: client.requests()) + assertTrue(req.request().header().apiKey() != ApiKeys.OFFSET_COMMIT.id); + + consumer.close(); + } + + @Test + public void testManualAssignmentChangeWithAutoCommitEnabled() { + int rebalanceTimeoutMs = 60000; + int sessionTimeoutMs = 30000; + int heartbeatIntervalMs = 3000; + int autoCommitIntervalMs = 1000; + + Time time = new MockTime(); + MockClient client = new MockClient(time); + Map<String, Integer> tpCounts = new HashMap<>(); + tpCounts.put(topic, 1); + tpCounts.put(topic2, 1); + Cluster cluster = TestUtils.singletonCluster(tpCounts); + Node node = cluster.nodes().get(0); + client.setNode(node); + Metadata metadata = new Metadata(0, Long.MAX_VALUE); + metadata.update(cluster, time.milliseconds()); + PartitionAssignor assignor = new RangeAssignor(); + + final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, + rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs); + + // lookup coordinator + client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE.code(), node).toStruct(), node); + Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port()); + + // manual assignment + consumer.assign(Arrays.asList(tp0)); + consumer.seekToBeginning(Arrays.asList(tp0)); + + // fetch offset for one topic + client.prepareResponseFrom( + offsetResponse(Collections.singletonMap(tp0, 0L), Errors.NONE.code()), + coordinator); + assertEquals(0, consumer.committed(tp0).offset()); + + // verify that assignment immediately changes + assertTrue(consumer.assignment().equals(Collections.singleton(tp0))); + + // there shouldn't be any need to lookup the coordinator or fetch committed offsets. + // we just lookup the starting position and send the record fetch. + client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp0, 10L), Errors.NONE.code())); + client.prepareResponse(fetchResponse(tp0, 10L, 1)); + + ConsumerRecords<String, String> records = consumer.poll(0); + assertEquals(1, records.count()); + assertEquals(11L, consumer.position(tp0)); + + // mock the offset commit response for to be revoked partitions + AtomicBoolean commitReceived = prepareOffsetCommitResponse(client, coordinator, tp0, 11); + + // new manual assignment + consumer.assign(Arrays.asList(t2p0)); + + // verify that assignment immediately changes + assertTrue(consumer.assignment().equals(Collections.singleton(t2p0))); + // verify that the offset commits occurred as expected + assertTrue(commitReceived.get()); + + consumer.close(); + } + + @Test + public void testManualAssignmentChangeWithAutoCommitDisabled() { + int rebalanceTimeoutMs = 60000; + int sessionTimeoutMs = 30000; + int heartbeatIntervalMs = 3000; + int autoCommitIntervalMs = 1000; + + Time time = new MockTime(); + MockClient client = new MockClient(time); + Map<String, Integer> tpCounts = new HashMap<>(); + tpCounts.put(topic, 1); + tpCounts.put(topic2, 1); + Cluster cluster = TestUtils.singletonCluster(tpCounts); + Node node = cluster.nodes().get(0); + client.setNode(node); + Metadata metadata = new Metadata(0, Long.MAX_VALUE); + metadata.update(cluster, time.milliseconds()); + PartitionAssignor assignor = new RangeAssignor(); + + final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, + rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, false, autoCommitIntervalMs); + + // lookup coordinator + client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE.code(), node).toStruct(), node); + Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port()); + + // manual assignment + consumer.assign(Arrays.asList(tp0)); + consumer.seekToBeginning(Arrays.asList(tp0)); + + // fetch offset for one topic + client.prepareResponseFrom( + offsetResponse(Collections.singletonMap(tp0, 0L), Errors.NONE.code()), + coordinator); + assertEquals(0, consumer.committed(tp0).offset()); + + // verify that assignment immediately changes + assertTrue(consumer.assignment().equals(Collections.singleton(tp0))); + + // there shouldn't be any need to lookup the coordinator or fetch committed offsets. + // we just lookup the starting position and send the record fetch. + client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp0, 10L), Errors.NONE.code())); + client.prepareResponse(fetchResponse(tp0, 10L, 1)); + + ConsumerRecords<String, String> records = consumer.poll(0); + assertEquals(1, records.count()); + assertEquals(11L, consumer.position(tp0)); + + // new manual assignment + consumer.assign(Arrays.asList(t2p0)); + + // verify that assignment immediately changes + assertTrue(consumer.assignment().equals(Collections.singleton(t2p0))); + + // the auto commit is disabled, so no offset commit request should be sent + for (ClientRequest req: client.requests()) + assertTrue(req.request().header().apiKey() != ApiKeys.OFFSET_COMMIT.id); + + consumer.close(); + } + + private ConsumerRebalanceListener getConsumerRebalanceListener(final KafkaConsumer<String, String> consumer) { + return new ConsumerRebalanceListener() { @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { @@ -655,39 +907,68 @@ public class KafkaConsumerTest { for (TopicPartition partition : partitions) consumer.seek(partition, 0); } - }); - - // lookup coordinator - client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE.code(), node).toStruct(), node); + }; + } - Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port()); + private Node prepareRebalance(MockClient client, Node node, PartitionAssignor assignor, List<TopicPartition> partitions, Node coordinator) { + if (coordinator == null) { + // lookup coordinator + client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE.code(), node).toStruct(), node); + coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port()); + } // join group client.prepareResponseFrom(joinGroupFollowerResponse(assignor, 1, "memberId", "leaderId", Errors.NONE.code()), coordinator); // sync group - client.prepareResponseFrom(syncGroupResponse(Arrays.asList(partition), Errors.NONE.code()), coordinator); + client.prepareResponseFrom(syncGroupResponse(partitions, Errors.NONE.code()), coordinator); - consumer.poll(0); + return coordinator; + } - // respond to the outstanding fetch so that we have data available on the next poll - client.respondFrom(fetchResponse(partition, 0, 5), node); - client.poll(0, time.milliseconds()); + private AtomicBoolean prepareHeartbeatResponse(MockClient client, Node coordinator) { + final AtomicBoolean heartbeatReceived = new AtomicBoolean(false); + client.prepareResponseFrom(new MockClient.RequestMatcher() { + @Override + public boolean matches(ClientRequest request) { + heartbeatReceived.set(true); + return true; + } + }, new HeartbeatResponse(Errors.NONE.code()).toStruct(), coordinator); + return heartbeatReceived; + } - consumer.wakeup(); + private AtomicBoolean prepareOffsetCommitResponse(MockClient client, Node coordinator, final Map<TopicPartition, Long> partitionOffsets) { + final AtomicBoolean commitReceived = new AtomicBoolean(true); + Map<TopicPartition, Short> response = new HashMap<>(); + for (TopicPartition partition : partitionOffsets.keySet()) + response.put(partition, Errors.NONE.code()); - try { - consumer.poll(0); - fail(); - } catch (WakeupException e) { - } + client.prepareResponseFrom(new MockClient.RequestMatcher() { + @Override + public boolean matches(ClientRequest request) { + OffsetCommitRequest commitRequest = new OffsetCommitRequest(request.request().body()); + for (Map.Entry<TopicPartition, Long> partitionOffset : partitionOffsets.entrySet()) { + OffsetCommitRequest.PartitionData partitionData = commitRequest.offsetData().get(partitionOffset.getKey()); + // verify that the expected offset has been committed + if (partitionData.offset != partitionOffset.getValue()) { + commitReceived.set(false); + return false; + } + } + return true; + } + }, offsetCommitResponse(response), coordinator); + return commitReceived; + } - // make sure the position hasn't been updated - assertEquals(0, consumer.position(partition)); + private AtomicBoolean prepareOffsetCommitResponse(MockClient client, Node coordinator, final TopicPartition partition, final long offset) { + return prepareOffsetCommitResponse(client, coordinator, Collections.singletonMap(partition, offset)); + } - // the next poll should return the completed fetch - ConsumerRecords<String, String> records = consumer.poll(0); - assertEquals(5, records.count()); + private Struct offsetCommitResponse(Map<TopicPartition, Short> responseData) { + OffsetCommitResponse response = new OffsetCommitResponse(responseData); + return response.toStruct(); } private Struct joinGroupFollowerResponse(PartitionAssignor assignor, int generationId, String memberId, String leaderId, short error) { @@ -717,16 +998,27 @@ public class KafkaConsumerTest { return new ListOffsetResponse(partitionData).toStruct(); } - private Struct fetchResponse(TopicPartition tp, long fetchOffset, int count) { - MemoryRecords records = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE); - for (int i = 0; i < count; i++) - records.append(fetchOffset + i, 0L, ("key-" + i).getBytes(), ("value-" + i).getBytes()); - records.close(); - FetchResponse response = new FetchResponse(Collections.singletonMap( - tp, new FetchResponse.PartitionData(Errors.NONE.code(), 5, records.buffer())), 0); + private Struct fetchResponse(Map<TopicPartition, FetchInfo> fetches) { + Map<TopicPartition, PartitionData> tpResponses = new HashMap<>(); + for (Map.Entry<TopicPartition, FetchInfo> fetchEntry : fetches.entrySet()) { + TopicPartition partition = fetchEntry.getKey(); + long fetchOffset = fetchEntry.getValue().offset; + int fetchCount = fetchEntry.getValue().count; + MemoryRecords records = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE); + for (int i = 0; i < fetchCount; i++) + records.append(fetchOffset + i, 0L, ("key-" + i).getBytes(), ("value-" + i).getBytes()); + records.close(); + tpResponses.put(partition, new FetchResponse.PartitionData(Errors.NONE.code(), 0, records.buffer())); + } + FetchResponse response = new FetchResponse(tpResponses, 0); return response.toStruct(); } + private Struct fetchResponse(TopicPartition partition, long fetchOffset, int count) { + FetchInfo fetchInfo = new FetchInfo(fetchOffset, count); + return fetchResponse(Collections.singletonMap(partition, fetchInfo)); + } + private KafkaConsumer<String, String> newConsumer(Time time, KafkaClient client, Metadata metadata, @@ -734,6 +1026,7 @@ public class KafkaConsumerTest { int rebalanceTimeoutMs, int sessionTimeoutMs, int heartbeatIntervalMs, + boolean autoCommitEnabled, int autoCommitIntervalMs) { // create a consumer with mocked time and mocked network @@ -742,7 +1035,6 @@ public class KafkaConsumerTest { String metricGroupPrefix = "consumer"; long retryBackoffMs = 100; long requestTimeoutMs = 30000; - boolean autoCommitEnabled = true; boolean excludeInternalTopics = true; int minBytes = 1; int maxWaitMs = 500; @@ -808,10 +1100,17 @@ public class KafkaConsumerTest { metrics, subscriptions, metadata, - autoCommitEnabled, - autoCommitIntervalMs, - heartbeatIntervalMs, retryBackoffMs, requestTimeoutMs); } + + private static class FetchInfo { + long offset; + int count; + + FetchInfo(long offset, int count) { + this.offset = offset; + this.count = count; + } + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/54767bbb/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java index 783f0e6..a950cad 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java @@ -39,8 +39,9 @@ public class SubscriptionStateTest { private final SubscriptionState state = new SubscriptionState(OffsetResetStrategy.EARLIEST); private final String topic = "test"; private final String topic1 = "test1"; - private final TopicPartition tp0 = new TopicPartition("test", 0); - private final TopicPartition tp1 = new TopicPartition("test", 1); + private final TopicPartition tp0 = new TopicPartition(topic, 0); + private final TopicPartition tp1 = new TopicPartition(topic, 1); + private final TopicPartition t1p0 = new TopicPartition(topic1, 0); private final MockRebalanceListener rebalanceListener = new MockRebalanceListener(); @Test @@ -60,6 +61,62 @@ public class SubscriptionStateTest { } @Test + public void partitionAssignmentChangeOnTopicSubscription() { + state.assignFromUser(new HashSet<>(Arrays.asList(tp0, tp1))); + // assigned partitions should immediately change + assertEquals(2, state.assignedPartitions().size()); + assertTrue(state.assignedPartitions().contains(tp0)); + assertTrue(state.assignedPartitions().contains(tp1)); + + state.unsubscribe(); + // assigned partitions should immediately change + assertTrue(state.assignedPartitions().isEmpty()); + + state.subscribe(singleton(topic1), rebalanceListener); + // assigned partitions should remain unchanged + assertTrue(state.assignedPartitions().isEmpty()); + + state.assignFromSubscribed(asList(t1p0)); + // assigned partitions should immediately change + assertEquals(singleton(t1p0), state.assignedPartitions()); + + state.subscribe(singleton(topic), rebalanceListener); + // assigned partitions should remain unchanged + assertEquals(singleton(t1p0), state.assignedPartitions()); + + state.unsubscribe(); + // assigned partitions should immediately change + assertTrue(state.assignedPartitions().isEmpty()); + } + + @Test + public void partitionAssignmentChangeOnPatternSubscription() { + state.subscribe(Pattern.compile(".*"), rebalanceListener); + // assigned partitions should remain unchanged + assertTrue(state.assignedPartitions().isEmpty()); + + state.subscribeFromPattern(new HashSet<>(Arrays.asList(topic, topic1))); + // assigned partitions should remain unchanged + assertTrue(state.assignedPartitions().isEmpty()); + + state.assignFromSubscribed(asList(tp1)); + // assigned partitions should immediately change + assertEquals(singleton(tp1), state.assignedPartitions()); + + state.subscribe(Pattern.compile(".*t"), rebalanceListener); + // assigned partitions should remain unchanged + assertEquals(singleton(tp1), state.assignedPartitions()); + + state.subscribeFromPattern(singleton(topic)); + // assigned partitions should remain unchanged + assertEquals(singleton(tp1), state.assignedPartitions()); + + state.unsubscribe(); + // assigned partitions should immediately change + assertTrue(state.assignedPartitions().isEmpty()); + } + + @Test public void partitionReset() { state.assignFromUser(singleton(tp0)); state.seek(tp0, 5); @@ -217,5 +274,5 @@ public class SubscriptionStateTest { } } - + }