rondagostino commented on code in PR #14863:
URL: https://github.com/apache/kafka/pull/14863#discussion_r1416193447
##########
core/src/test/scala/unit/kafka/server/ControllerApisTest.scala:
##########
@@ -1139,31 +1139,24 @@ class ControllerApisTest {
}
@Test
- def testAssignReplicasToDirsReturnsUnsupportedVersion(): Unit = {
+ def testAssignReplicasToDirs(): Unit = {
val controller = mock(classOf[Controller])
- val controllerApis = createControllerApis(None, controller)
+ val authorizer = mock(classOf[Authorizer])
+ val controllerApis = createControllerApis(Some(authorizer), controller)
+
+ val request = new AssignReplicasToDirsRequest.Builder(new
AssignReplicasToDirsRequestData()).build()
+
+ when(authorizer.authorize(any[RequestContext],
ArgumentMatchers.eq(Collections.singletonList(new Action(
+ AclOperation.CLUSTER_ACTION,
+ new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME,
PatternType.LITERAL),
+ 1, true, true
+ )))))
+ .thenReturn(Collections.singletonList(AuthorizationResult.ALLOWED))
+ when(controller.assignReplicasToDirs(any[ControllerRequestContext],
ArgumentMatchers.eq(request.data)))
+ .thenThrow(Errors.UNKNOWN_TOPIC_OR_PARTITION.exception())
Review Comment:
Need to return something like `completableFuture.completeExceptionally()`
here as opposed to throwing an exception directly.
##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -2019,6 +2021,70 @@ ListPartitionReassignmentsResponseData
listPartitionReassignments(
return response;
}
+ ControllerResult<AssignReplicasToDirsResponseData>
handleAssignReplicasToDirs(AssignReplicasToDirsRequestData request) {
+ int brokerId = request.brokerId();
+ clusterControl.checkBrokerEpoch(brokerId, request.brokerEpoch());
+ BrokerRegistration brokerRegistration =
clusterControl.brokerRegistrations().get(brokerId);
+ List<ApiMessageAndVersion> records = new ArrayList<>();
+ AssignReplicasToDirsResponseData response = new
AssignReplicasToDirsResponseData();
+ Set<TopicIdPartition> leaderAndIsrUpdates = new HashSet<>();
+ for (AssignReplicasToDirsRequestData.DirectoryData reqDir :
request.directories()) {
+ Uuid dirId = reqDir.id();
+ AssignReplicasToDirsResponseData.DirectoryData resDir = new
AssignReplicasToDirsResponseData.DirectoryData().setId(dirId);
+ for (AssignReplicasToDirsRequestData.TopicData reqTopic :
reqDir.topics()) {
+ Uuid topicId = reqTopic.topicId();
+ Errors topicError = Errors.NONE;
+ TopicControlInfo topicControl = this.topics.get(topicId);
Review Comment:
nit: `topicControlInfo` or `topicInfo` might be a better name than
`topicControl` -- get the "Info" in there somehow.
##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -2019,6 +2021,70 @@ ListPartitionReassignmentsResponseData
listPartitionReassignments(
return response;
}
+ ControllerResult<AssignReplicasToDirsResponseData>
handleAssignReplicasToDirs(AssignReplicasToDirsRequestData request) {
+ int brokerId = request.brokerId();
+ clusterControl.checkBrokerEpoch(brokerId, request.brokerEpoch());
+ BrokerRegistration brokerRegistration =
clusterControl.brokerRegistrations().get(brokerId);
+ List<ApiMessageAndVersion> records = new ArrayList<>();
+ AssignReplicasToDirsResponseData response = new
AssignReplicasToDirsResponseData();
+ Set<TopicIdPartition> leaderAndIsrUpdates = new HashSet<>();
+ for (AssignReplicasToDirsRequestData.DirectoryData reqDir :
request.directories()) {
+ Uuid dirId = reqDir.id();
+ AssignReplicasToDirsResponseData.DirectoryData resDir = new
AssignReplicasToDirsResponseData.DirectoryData().setId(dirId);
+ for (AssignReplicasToDirsRequestData.TopicData reqTopic :
reqDir.topics()) {
+ Uuid topicId = reqTopic.topicId();
+ Errors topicError = Errors.NONE;
+ TopicControlInfo topicControl = this.topics.get(topicId);
+ if (topicControl == null) {
+ log.warn("AssignReplicasToDirsRequest from broker {}
references unknown topic ID {}", brokerId, topicId);
+ topicError = Errors.UNKNOWN_TOPIC_ID;
+ }
+ AssignReplicasToDirsResponseData.TopicData resTopic = new
AssignReplicasToDirsResponseData.TopicData().setTopicId(topicId);
+ for (AssignReplicasToDirsRequestData.PartitionData
reqPartition : reqTopic.partitions()) {
+ int partitionIndex = reqPartition.partitionIndex();
+ Errors partitionError = topicError;
+ if (topicError == Errors.NONE) {
+ String topicName = topicControl.name;
+ PartitionRegistration partitionRegistration =
topicControl.parts.get(partitionIndex);
+ if (partitionRegistration == null) {
+ log.warn("AssignReplicasToDirsRequest from broker
{} references unknown partition {}-{}", brokerId, topicName, partitionIndex);
+ partitionError = Errors.UNKNOWN_TOPIC_OR_PARTITION;
+ } else if
(!Replicas.contains(partitionRegistration.replicas, brokerId)) {
+ log.warn("AssignReplicasToDirsRequest from broker
{} references non assigned partition {}-{}", brokerId, topicName,
partitionIndex);
+ partitionError = Errors.NOT_LEADER_OR_FOLLOWER;
+ } else {
+ Optional<ApiMessageAndVersion>
partitionChangeRecord = new PartitionChangeBuilder(
+ partitionRegistration,
+ topicId,
+ partitionIndex,
+ new LeaderAcceptor(clusterControl,
partitionRegistration),
+ featureControl.metadataVersion(),
+ getTopicEffectiveMinIsr(topicName)
+ )
+ .setDirectory(brokerId, dirId)
+ .setDefaultDirProvider(clusterDescriber)
+ .build();
+ partitionChangeRecord.ifPresent(records::add);
+
+ if (!brokerRegistration.hasOnlineDir(dirId)) {
Review Comment:
Do we need to do this search for every partition? I think we could just do
it once above and then reuse the result here?
##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -2019,6 +2021,70 @@ ListPartitionReassignmentsResponseData
listPartitionReassignments(
return response;
}
+ ControllerResult<AssignReplicasToDirsResponseData>
handleAssignReplicasToDirs(AssignReplicasToDirsRequestData request) {
+ int brokerId = request.brokerId();
+ clusterControl.checkBrokerEpoch(brokerId, request.brokerEpoch());
+ BrokerRegistration brokerRegistration =
clusterControl.brokerRegistrations().get(brokerId);
Review Comment:
Need to check for `null` to ensure it is actually registered.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]