squah-confluent commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2587851616


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##########
@@ -1027,12 +1027,11 @@ private void addTaskProcessIdFromActiveTasksWithEpochs(
                 if (partitionsOrNull == null) {
                     partitionsOrNull = new TimelineHashMap<>(snapshotRegistry, 
assignedTaskPartitionsWithEpochs.size());
                 }
-                for (Integer partitionId : 
assignedTaskPartitionsWithEpochs.keySet()) {
+                for (Integer partitionId: 
assignedTaskPartitionsWithEpochs.keySet()) {
                     String prevValue = partitionsOrNull.put(partitionId, 
processId);
                     if (prevValue != null) {
-                        throw new IllegalStateException(
-                            String.format("Cannot set the process ID of %s-%s 
to %s because the partition is " +
-                                "still owned by process ID %s", subtopologyId, 
partitionId, processId, prevValue));
+                        log.debug("[GroupId {}] Cannot set the process ID of 
{}-{} to {} because the partition is " +
+                            "still owned by process ID {}", groupId, 
subtopologyId, partitionId, processId, prevValue);

Review Comment:
   Technically we are actually setting the process ID. Let's update the log 
message to be more accurate.
   ```suggestion
                           log.debug("[GroupId {}] Setting the process ID of 
{}-{} to {} even though the partition is " +
                               "still owned by process ID {}", groupId, 
subtopologyId, partitionId, processId, prevValue);
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##########
@@ -1027,12 +1027,11 @@ private void addTaskProcessIdFromActiveTasksWithEpochs(
                 if (partitionsOrNull == null) {
                     partitionsOrNull = new TimelineHashMap<>(snapshotRegistry, 
assignedTaskPartitionsWithEpochs.size());
                 }
-                for (Integer partitionId : 
assignedTaskPartitionsWithEpochs.keySet()) {
+                for (Integer partitionId: 
assignedTaskPartitionsWithEpochs.keySet()) {

Review Comment:
   nit: missing space
   ```suggestion
                   for (Integer partitionId : 
assignedTaskPartitionsWithEpochs.keySet()) {
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##########
@@ -1004,7 +1004,7 @@ private void removeTaskProcessIdsFromSet(
      *
      * @param tasks     The assigned tasks.
      * @param processId The process ID.
-     * @throws IllegalStateException if the partition already has an epoch 
assigned. package-private for testing.
+     * @throws IllegalStateException if the existing partition has larger 
epoch than the new one. package-private for testing.

Review Comment:
   I think we don't compare epochs for streams now?
   ```suggestion
        * package-private for testing.
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -312,13 +325,19 @@ public void testRemovePartitionEpochs() {
 
         consumerGroup.updateMember(m1);
 
-        // Removing should fail because the expected epoch is incorrect.
-        assertThrows(IllegalStateException.class, () -> 
consumerGroup.removePartitionEpochs(
+        // Removing with incorrect epoch should do nothing. 
+        // A debug message is logged, no exception is thrown.
+        consumerGroup.removePartitionEpochs(
             mkAssignment(
                 mkTopicAssignment(fooTopicId, 1)
             ),
             11
-        ));
+        );
+        assertEquals(
+            mkAssignment(mkTopicAssignment(fooTopicId, 1)),
+            consumerGroup.getOrMaybeCreateMember("m1", 
false).assignedPartitions()
+        );

Review Comment:
   ```suggestion
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java:
##########
@@ -388,11 +389,16 @@ public void testRemoveTaskProcessIds(TaskRole taskRole) {
 
         streamsGroup.updateMember(m1);
 
-        // Removing should fail because the expected epoch is incorrect.
-        assertThrows(IllegalStateException.class, () -> 
streamsGroup.removeTaskProcessIds(
-            TaskAssignmentTestUtil.mkTasksTupleWithCommonEpoch(taskRole, 10, 
mkTasks(fooSubtopologyId, 1)),
+        // Removing with incorrect process id should do nothing. 
+        // A debug message is logged, no exception is thrown.
+        streamsGroup.removeTaskProcessIds(
+            TaskAssignmentTestUtil.mkTasksTupleWithCommonEpoch(taskRole, 9, 
mkTasks(fooSubtopologyId, 1)),
             "process1"
-        ));
+        );
+        assertEquals(m1.assignedTasks(), 
streamsGroup.getMemberOrThrow("m1").assignedTasks());

Review Comment:
   ```suggestion
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java:
##########
@@ -409,16 +415,17 @@ public void testAddTaskProcessIds() {
             "process"
         );
 
-        // Changing the epoch should fail because the owner of the partition
-        // should remove it first.
-        assertThrows(IllegalStateException.class, () -> 
streamsGroup.addTaskProcessId(
+        // We allow changing the epoch with the same process id. 
+        streamsGroup.addTaskProcessId(
             new TasksTupleWithEpochs(
                 mkTasksPerSubtopologyWithCommonEpoch(10, 
mkTasks(fooSubtopologyId, 1)),
                 mkTasksPerSubtopology(mkTasks(fooSubtopologyId, 2)),
                 mkTasksPerSubtopology(mkTasks(fooSubtopologyId, 3))
             ),
             "process"
-        ));
+        );
+
+        assertEquals("process", 
streamsGroup.currentActiveTaskProcessId(fooSubtopologyId, 1));

Review Comment:
   Let's test the process ID replacement branch.
   ```suggestion
               "process1"
           );
   
           // We allow replacing with a different process id. 
           streamsGroup.addTaskProcessId(
               new TasksTupleWithEpochs(
                   mkTasksPerSubtopologyWithCommonEpoch(10, 
mkTasks(fooSubtopologyId, 1)),
                   mkTasksPerSubtopology(mkTasks(fooSubtopologyId, 2)),
                   mkTasksPerSubtopology(mkTasks(fooSubtopologyId, 3))
               ),
               "process2"
           );
   
           assertEquals("process2", 
streamsGroup.currentActiveTaskProcessId(fooSubtopologyId, 1));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to