cadonna commented on a change in pull request #8892:
URL: https://github.com/apache/kafka/pull/8892#discussion_r444067825



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
##########
@@ -1953,6 +1975,54 @@ public void 
shouldRequestCommittedOffsetsForPreexistingSourceChangelogs() {
         EasyMock.verify(consumerClient);
     }
 
+    @Test(timeout = 30 * 1000)
+    public void shouldCompleteLargeAssignmentInAReasonableAmountOfTime() {
+        builder.addSource(null, "source", null, null, null, 
TOPICS_LIST_XL.toArray(new String[0]));
+        builder.addProcessor("processor", new MockProcessorSupplier(), 
"source");
+        builder.addStateStore(new MockKeyValueStoreBuilder("store", false), 
"processor");
+
+        for (int i = 0; i < NUM_CONSUMERS_XL; ++i) {
+            subscriptions.put("consumer-" + i,
+                              new Subscription(
+                                  TOPICS_LIST_XL,
+                                  getInfo(uuidForInt(i), EMPTY_TASKS, 
EMPTY_TASKS).encode())
+            );
+        }
+        createMockTaskManager(EMPTY_TASKS, EMPTY_TASKS);
+        createMockAdminClient(CHANGELOG_END_OFFSETS_XL);
+        
configurePartitionAssignorWith(singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG,
 3));
+
+        final Map<String, Assignment> assignments =
+            partitionAssignor.assign(CLUSTER_METADATA_XL, new 
GroupSubscription(subscriptions)).groupAssignment();
+
+        // Use the assignment to generate the subscriptions' prev task data 
for the next rebalance
+        for (int i = 0; i < NUM_CONSUMERS_XL; ++i) {
+            final String consumer = "consumer-" + i;
+            final Assignment assignment = assignments.get(consumer);
+            final AssignmentInfo info = 
AssignmentInfo.decode(assignment.userData());
+
+            subscriptions.put("consumer-" + i,
+                              new Subscription(
+                                  TOPICS_LIST_XL,
+                                  getInfo(uuidForInt(i), new 
HashSet<>(info.activeTasks()), info.standbyTasks().keySet()).encode(),
+                                  assignment.partitions())
+            );
+        }
+
+        final Map<String, Assignment> secondAssignments =
+            partitionAssignor.assign(CLUSTER_METADATA_XL, new 
GroupSubscription(subscriptions)).groupAssignment();
+    }
+
+    private static List<PartitionInfo> getPartitionInfos(final int numTopics, 
final int numPartitionsPerTopic) {
+        final List<PartitionInfo> partitionInfos = new ArrayList<>();
+        for (int t = 1; t <= numTopics; ++t) { // topic numbering starts from 1
+            for (int p = 0; p < numPartitionsPerTopic; ++p) {
+                partitionInfos.add(new PartitionInfo("topic" + t, p, 
Node.noNode(), new Node[0], new Node[0]));
+            }
+        }
+        return  partitionInfos;
+    }

Review comment:
       prop: Could you please move this method closer to where it is used, 
i.e., around line 200? 

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
##########
@@ -1932,10 +1954,10 @@ public void 
shouldRequestCommittedOffsetsForPreexistingSourceChangelogs() {
         streamsBuilder.table("topic1", Materialized.as("store"));
 
         subscriptions.put("consumer10",
-            new Subscription(
-                singletonList("topic1"),
-                defaultSubscriptionInfo.encode()
-            ));
+                          new Subscription(
+                              singletonList("topic1"),
+                              defaultSubscriptionInfo.encode()
+                          ));

Review comment:
       prop: 
   
   ```
   subscriptions.put("consumer10", new Subscription(singletonList("topic1"), 
defaultSubscriptionInfo.encode()));
   
   ```
   

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
##########
@@ -1953,6 +1975,54 @@ public void 
shouldRequestCommittedOffsetsForPreexistingSourceChangelogs() {
         EasyMock.verify(consumerClient);
     }
 
+    @Test(timeout = 30 * 1000)
+    public void shouldCompleteLargeAssignmentInAReasonableAmountOfTime() {
+        builder.addSource(null, "source", null, null, null, 
TOPICS_LIST_XL.toArray(new String[0]));
+        builder.addProcessor("processor", new MockProcessorSupplier(), 
"source");
+        builder.addStateStore(new MockKeyValueStoreBuilder("store", false), 
"processor");
+
+        for (int i = 0; i < NUM_CONSUMERS_XL; ++i) {
+            subscriptions.put("consumer-" + i,
+                              new Subscription(
+                                  TOPICS_LIST_XL,
+                                  getInfo(uuidForInt(i), EMPTY_TASKS, 
EMPTY_TASKS).encode())
+            );
+        }
+        createMockTaskManager(EMPTY_TASKS, EMPTY_TASKS);
+        createMockAdminClient(CHANGELOG_END_OFFSETS_XL);
+        
configurePartitionAssignorWith(singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG,
 3));
+
+        final Map<String, Assignment> assignments =
+            partitionAssignor.assign(CLUSTER_METADATA_XL, new 
GroupSubscription(subscriptions)).groupAssignment();
+
+        // Use the assignment to generate the subscriptions' prev task data 
for the next rebalance
+        for (int i = 0; i < NUM_CONSUMERS_XL; ++i) {
+            final String consumer = "consumer-" + i;
+            final Assignment assignment = assignments.get(consumer);
+            final AssignmentInfo info = 
AssignmentInfo.decode(assignment.userData());
+
+            subscriptions.put("consumer-" + i,
+                              new Subscription(
+                                  TOPICS_LIST_XL,
+                                  getInfo(uuidForInt(i), new 
HashSet<>(info.activeTasks()), info.standbyTasks().keySet()).encode(),
+                                  assignment.partitions())
+            );
+        }
+
+        final Map<String, Assignment> secondAssignments =

Review comment:
       req: Could you please remove variable `secondAssignments` since it is 
never used?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
##########
@@ -1953,6 +1975,54 @@ public void 
shouldRequestCommittedOffsetsForPreexistingSourceChangelogs() {
         EasyMock.verify(consumerClient);
     }
 
+    @Test(timeout = 30 * 1000)
+    public void shouldCompleteLargeAssignmentInAReasonableAmountOfTime() {
+        builder.addSource(null, "source", null, null, null, 
TOPICS_LIST_XL.toArray(new String[0]));
+        builder.addProcessor("processor", new MockProcessorSupplier(), 
"source");
+        builder.addStateStore(new MockKeyValueStoreBuilder("store", false), 
"processor");
+
+        for (int i = 0; i < NUM_CONSUMERS_XL; ++i) {
+            subscriptions.put("consumer-" + i,
+                              new Subscription(
+                                  TOPICS_LIST_XL,
+                                  getInfo(uuidForInt(i), EMPTY_TASKS, 
EMPTY_TASKS).encode())
+            );
+        }
+        createMockTaskManager(EMPTY_TASKS, EMPTY_TASKS);
+        createMockAdminClient(CHANGELOG_END_OFFSETS_XL);
+        
configurePartitionAssignorWith(singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG,
 3));
+
+        final Map<String, Assignment> assignments =
+            partitionAssignor.assign(CLUSTER_METADATA_XL, new 
GroupSubscription(subscriptions)).groupAssignment();
+
+        // Use the assignment to generate the subscriptions' prev task data 
for the next rebalance
+        for (int i = 0; i < NUM_CONSUMERS_XL; ++i) {

Review comment:
       Q: I am wondering if it would be better to extract the two assignments 
to two scale tests, so that when one fails we immediately know whether the 
startup assignment or the intermediate assignment is slow. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to