showuon commented on code in PR #11983:
URL: https://github.com/apache/kafka/pull/11983#discussion_r860546268
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -298,20 +329,21 @@ protected Map<String, ByteBuffer>
performTaskAssignment(String leaderId, long ma
Map<String, Collection<ConnectorTaskId>> incrementalTaskAssignments =
diff(taskAssignments, currentTaskAssignments);
+ previousAssignment = computePreviousAssignment(toRevoke,
connectorAssignments, taskAssignments, lostAssignments);
+ previousGenerationId = currentGenerationId;
+ previousMembers = memberAssignments.keySet();
+
log.debug("Incremental connector assignments: {}",
incrementalConnectorAssignments);
log.debug("Incremental task assignments: {}",
incrementalTaskAssignments);
- coordinator.leaderState(new LeaderState(memberConfigs,
connectorAssignments, taskAssignments));
Review Comment:
Thanks for the explanation! Make sense to me!
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java:
##########
@@ -44,93 +35,49 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
-import java.util.stream.Stream;
-import static
org.apache.kafka.connect.runtime.distributed.ExtendedAssignment.duplicate;
-import static
org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V1;
-import static
org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2;
+import static
org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor.ClusterAssignment;
import static
org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.WorkerLoad;
+import static org.apache.kafka.connect.util.ConnectUtils.transformValues;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-import static org.junit.runners.Parameterized.Parameter;
-import static org.junit.runners.Parameterized.Parameters;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
-
-@RunWith(Parameterized.class)
-public class IncrementalCooperativeAssignorTest {
- @Rule
- public MockitoRule rule = MockitoJUnit.rule();
-
- @Mock
- private WorkerCoordinator coordinator;
-
- @Captor
- ArgumentCaptor<Map<String, ExtendedAssignment>> assignmentsCapture;
-
- @Parameters
- public static Iterable<?> mode() {
- return Arrays.asList(CONNECT_PROTOCOL_V1, CONNECT_PROTOCOL_V2);
- }
Review Comment:
Yes, you are right. It looks like we don't care `protocolVersion` that much.
The only place we check it, is when in the public `performAssignment` method.
```java
short protocolVersion = memberConfigs.values().stream()
.allMatch(state -> state.assignment().version() ==
CONNECT_PROTOCOL_V2)
? CONNECT_PROTOCOL_V2
: CONNECT_PROTOCOL_V1;
```
Did we have tests for that? If no, could we add tests for it?
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java:
##########
@@ -1375,24 +1171,6 @@ private void assertCompleteAllocation() {
});
}
- private void verifyCoordinatorInteractions() {
- verify(coordinator, times(rebalanceNum)).configSnapshot();
- verify(coordinator, times(rebalanceNum)).leaderState(any());
- verify(coordinator, times(2 * rebalanceNum)).generationId();
- verify(coordinator, times(rebalanceNum)).memberId();
- verify(coordinator, times(rebalanceNum)).lastCompletedGenerationId();
- }
Review Comment:
I agree all the verification is not much helpful, except the `leaderState`.
Could we add some tests to verify the `Coordinator.leaderState` after
`performTaskAssignment`? I think Mock is also fine, just want to make sure it
is well tested. Thank you.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]