hachikuji commented on a change in pull request #11116:
URL: https://github.com/apache/kafka/pull/11116#discussion_r679534669
##########
File path: metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
##########
@@ -631,7 +658,34 @@ public void
unregister(RaftClient.Listener<ApiMessageAndVersion> listener) {
@Override
public Long scheduleAppend(int epoch, List<ApiMessageAndVersion> batch) {
- return scheduleAtomicAppend(epoch, batch);
+ if (batch.isEmpty()) {
+ throw new IllegalArgumentException("Batch cannot be empty");
+ }
+
+ List<ApiMessageAndVersion> first = batch.subList(0, batch.size() / 2);
+ List<ApiMessageAndVersion> second = batch.subList(batch.size() / 2,
batch.size());
+
+ assertEquals(batch.size(), first.size() + second.size());
+ assertFalse(second.isEmpty());
+
+ OptionalLong firstOffset = first
+ .stream()
+ .mapToLong(record -> scheduleAtomicAppend(epoch,
Collections.singletonList(record)))
+ .max();
+
+ if (firstOffset.isPresent() &&
resignAfterNonAtomicCommit.getAndSet(false)) {
+ // Emulate losing leadering in them middle of a non-atomic append
by not writing
Review comment:
nit: losing leadership?
##########
File path:
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
##########
@@ -607,4 +612,143 @@ public void testEarlyControllerResults() throws Throwable
{
}
}
}
+
+ @Test
+ public void testMissingInMemorySnapshot() throws Exception {
+ int numBrokers = 3;
+ int numPartitions = 3;
+ String topicName = "topic-name";
+
+ try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1,
Optional.empty())) {
+ try (QuorumControllerTestEnv controlEnv =
Review comment:
nit: could we pull this into the first `try`?
##########
File path:
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
##########
@@ -607,4 +612,143 @@ public void testEarlyControllerResults() throws Throwable
{
}
}
}
+
+ @Test
+ public void testMissingInMemorySnapshot() throws Exception {
+ int numBrokers = 3;
+ int numPartitions = 3;
+ String topicName = "topic-name";
+
+ try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1,
Optional.empty())) {
+ try (QuorumControllerTestEnv controlEnv =
+ new QuorumControllerTestEnv(logEnv, b ->
b.setConfigDefs(CONFIGS))) {
+ QuorumController controller = controlEnv.activeController();
+
+ Map<Integer, Long> brokerEpochs = registerBrokers(controller,
numBrokers);
+
+ // Create a lot of partitions
+ List<CreatableReplicaAssignment> partitions = IntStream
+ .range(0, numPartitions)
+ .mapToObj(partitionIndex -> new
CreatableReplicaAssignment()
+ .setPartitionIndex(partitionIndex)
+ .setBrokerIds(Arrays.asList(0, 1, 2))
+ )
+ .collect(Collectors.toList());
+
+ Uuid topicId = controller.createTopics(
+ new CreateTopicsRequestData()
+ .setTopics(
+ new CreatableTopicCollection(
+ Collections.singleton(
+ new CreatableTopic()
+ .setName(topicName)
+ .setNumPartitions(-1)
+ .setReplicationFactor((short) -1)
+ .setAssignments(new
CreatableReplicaAssignmentCollection(partitions.iterator()))
+ ).iterator()
+ )
+ )
+ ).get().topics().find(topicName).topicId();
+
+ // Create a lot of alter isr
+ List<AlterIsrRequestData.PartitionData> alterIsrs = IntStream
+ .range(0, numPartitions)
+ .mapToObj(partitionIndex -> {
+ PartitionRegistration partitionRegistration =
controller.replicationControl().getPartition(
+ topicId,
+ partitionIndex
+ );
+
+ return new AlterIsrRequestData.PartitionData()
+ .setPartitionIndex(partitionIndex)
+ .setLeaderEpoch(partitionRegistration.leaderEpoch)
+
.setCurrentIsrVersion(partitionRegistration.partitionEpoch)
+ .setNewIsr(Arrays.asList(0, 1));
+ })
+ .collect(Collectors.toList());
+
+ AlterIsrRequestData.TopicData topicData = new
AlterIsrRequestData.TopicData()
+ .setName(topicName);
+ topicData.partitions().addAll(alterIsrs);
+
+ int leaderId = 0;
+ AlterIsrRequestData alterIsrRequest = new AlterIsrRequestData()
+ .setBrokerId(leaderId)
+ .setBrokerEpoch(brokerEpochs.get(leaderId));
+ alterIsrRequest.topics().add(topicData);
+
+ logEnv.logManagers().get(0).resignAfterNonAtomicCommit();
+
+ assertThrows(
+ ExecutionException.class,
+ () -> controller.alterIsr(alterIsrRequest).get()
+ );
+
+ // Wait for the new active controller
+ final QuorumController newController =
controlEnv.activeController();
Review comment:
This confused me a little bit since we are trying to verify that the
state on the original controller resets properly. That is what is happening
here since there is only one controller in the test, but it is obscured a
little bit by the new variable. Maybe it would be clearer to use the original
reference and write this as:
```java
assertEquals(controller, controlEnv.activeController());
```
Also, is there an epoch or something we can bump to ensure the transition?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]