chia7712 commented on a change in pull request #10184: URL: https://github.com/apache/kafka/pull/10184#discussion_r587072161
########## File path: core/src/test/scala/unit/kafka/server/ControllerApisTest.scala ########## @@ -142,6 +148,199 @@ class ControllerApisTest { brokerRegistrationResponse.errorCounts().asScala) } + @Test + def testDeleteTopicsByName(): Unit = { + val fooId = Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q") + val controller = new MockController.Builder().newInitialTopic("foo", fooId).build() + val controllerApis = createControllerApis(None, controller) + val request = new DeleteTopicsRequestData().setTopicNames( + util.Arrays.asList("foo", "bar", "quux", "quux")) + val expectedResponse = Set(new DeletableTopicResult().setName("quux"). + setErrorCode(Errors.INVALID_REQUEST.code()). + setErrorMessage("Duplicate topic name."), + new DeletableTopicResult().setName("bar"). + setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()). + setErrorMessage("This server does not host this topic-partition."), + new DeletableTopicResult().setName("foo").setTopicId(fooId)) + assertEquals(expectedResponse, controllerApis.deleteTopics(request, + ApiKeys.DELETE_TOPICS.latestVersion().toInt, + true, + _ => Set.empty, + _ => Set.empty).asScala.toSet) + } + + @Test + def testDeleteTopicsById(): Unit = { + val fooId = Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q") + val barId = Uuid.fromString("VlFu5c51ToiNx64wtwkhQw") + val quuxId = Uuid.fromString("ObXkLhL_S5W62FAE67U3MQ") + val controller = new MockController.Builder().newInitialTopic("foo", fooId).build() + val controllerApis = createControllerApis(None, controller) + val request = new DeleteTopicsRequestData() + request.topics().add(new DeleteTopicState().setName(null).setTopicId(fooId)) + request.topics().add(new DeleteTopicState().setName(null).setTopicId(barId)) + request.topics().add(new DeleteTopicState().setName(null).setTopicId(quuxId)) + request.topics().add(new DeleteTopicState().setName(null).setTopicId(quuxId)) + val response = Set(new DeletableTopicResult().setName(null).setTopicId(quuxId). + setErrorCode(Errors.INVALID_REQUEST.code()). + setErrorMessage("Duplicate topic id."), + new DeletableTopicResult().setName(null).setTopicId(barId). + setErrorCode(Errors.UNKNOWN_TOPIC_ID.code()). + setErrorMessage("This server does not host this topic ID."), + new DeletableTopicResult().setName("foo").setTopicId(fooId)) + assertEquals(response, controllerApis.deleteTopics(request, + ApiKeys.DELETE_TOPICS.latestVersion().toInt, + true, + _ => Set.empty, + _ => Set.empty).asScala.toSet) + } + + @Test + def testInvalidDeleteTopicsRequest(): Unit = { + val fooId = Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q") + val barId = Uuid.fromString("VlFu5c51ToiNx64wtwkhQw") + val bazId = Uuid.fromString("YOS4oQ3UT9eSAZahN1ysSA") + val controller = new MockController.Builder(). + newInitialTopic("foo", fooId). + newInitialTopic("bar", barId).build() + val controllerApis = createControllerApis(None, controller) + val request = new DeleteTopicsRequestData() + request.topics().add(new DeleteTopicState().setName(null).setTopicId(ZERO_UUID)) + request.topics().add(new DeleteTopicState().setName("foo").setTopicId(fooId)) + request.topics().add(new DeleteTopicState().setName("bar").setTopicId(ZERO_UUID)) + request.topics().add(new DeleteTopicState().setName(null).setTopicId(barId)) + request.topics().add(new DeleteTopicState().setName("quux").setTopicId(ZERO_UUID)) + request.topics().add(new DeleteTopicState().setName("quux").setTopicId(ZERO_UUID)) + request.topics().add(new DeleteTopicState().setName("quux").setTopicId(ZERO_UUID)) + request.topics().add(new DeleteTopicState().setName(null).setTopicId(bazId)) + request.topics().add(new DeleteTopicState().setName(null).setTopicId(bazId)) + request.topics().add(new DeleteTopicState().setName(null).setTopicId(bazId)) + val response = Set(new DeletableTopicResult().setName(null).setTopicId(ZERO_UUID). + setErrorCode(Errors.INVALID_REQUEST.code()). + setErrorMessage("Neither topic name nor id were specified."), + new DeletableTopicResult().setName("foo").setTopicId(fooId). + setErrorCode(Errors.INVALID_REQUEST.code()). + setErrorMessage("You may not specify both topic name and topic id."), + new DeletableTopicResult().setName("bar").setTopicId(barId). + setErrorCode(Errors.INVALID_REQUEST.code()). + setErrorMessage("The provided topic name maps to an ID that was already supplied."), + new DeletableTopicResult().setName("quux").setTopicId(ZERO_UUID). + setErrorCode(Errors.INVALID_REQUEST.code()). + setErrorMessage("Duplicate topic name."), + new DeletableTopicResult().setName(null).setTopicId(bazId). + setErrorCode(Errors.INVALID_REQUEST.code()). + setErrorMessage("Duplicate topic id.")) + assertEquals(response, controllerApis.deleteTopics(request, + ApiKeys.DELETE_TOPICS.latestVersion().toInt, + false, + names => names.toSet, + names => names.toSet).asScala.toSet) + } + + @Test + def testNotAuthorizedToDeleteWithTopicExisting(): Unit = { + val fooId = Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q") + val barId = Uuid.fromString("VlFu5c51ToiNx64wtwkhQw") + val bazId = Uuid.fromString("hr4TVh3YQiu3p16Awkka6w") + val quuxId = Uuid.fromString("5URoQzW_RJiERVZXJgUVLg") + val controller = new MockController.Builder(). + newInitialTopic("foo", fooId). + newInitialTopic("bar", barId). + newInitialTopic("baz", bazId). + newInitialTopic("quux", quuxId).build() + val controllerApis = createControllerApis(None, controller) + val request = new DeleteTopicsRequestData() + request.topics().add(new DeleteTopicState().setName(null).setTopicId(fooId)) + request.topics().add(new DeleteTopicState().setName(null).setTopicId(barId)) + request.topics().add(new DeleteTopicState().setName("baz").setTopicId(ZERO_UUID)) + request.topics().add(new DeleteTopicState().setName("quux").setTopicId(ZERO_UUID)) + val response = Set(new DeletableTopicResult().setName(null).setTopicId(barId). + setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code). + setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message), + new DeletableTopicResult().setName("quux").setTopicId(ZERO_UUID). + setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code). + setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message), + new DeletableTopicResult().setName("baz").setTopicId(ZERO_UUID). + setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code). + setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message), + new DeletableTopicResult().setName("foo").setTopicId(fooId). + setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code). + setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message)) + assertEquals(response, controllerApis.deleteTopics(request, + ApiKeys.DELETE_TOPICS.latestVersion().toInt, + false, + _ => Set("foo", "baz"), + _ => Set.empty).asScala.toSet) + } + + @Test + def testNotAuthorizedToDeleteWithTopicNotExisting(): Unit = { + val barId = Uuid.fromString("VlFu5c51ToiNx64wtwkhQw") + val controller = new MockController.Builder().build() + val props = new Properties() + props.put(KafkaConfig.DeleteTopicEnableProp, "false") + val controllerApis = createControllerApis(None, controller) + val request = new DeleteTopicsRequestData() + request.topics().add(new DeleteTopicState().setName("foo").setTopicId(ZERO_UUID)) + request.topics().add(new DeleteTopicState().setName("bar").setTopicId(ZERO_UUID)) + request.topics().add(new DeleteTopicState().setName(null).setTopicId(barId)) + val expectedResponse = Set(new DeletableTopicResult().setName("foo"). + setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code). + setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message), + new DeletableTopicResult().setName("bar"). + setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code). + setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message), + new DeletableTopicResult().setName(null).setTopicId(barId). + setErrorCode(Errors.UNKNOWN_TOPIC_ID.code). + setErrorMessage(Errors.UNKNOWN_TOPIC_ID.message)) + assertEquals(expectedResponse, controllerApis.deleteTopics(request, + ApiKeys.DELETE_TOPICS.latestVersion().toInt, + false, + _ => Set("foo"), + _ => Set.empty).asScala.toSet) + } + + @Test + def testNotControllerErrorPreventsDeletingTopics(): Unit = { + val fooId = Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q") + val barId = Uuid.fromString("VlFu5c51ToiNx64wtwkhQw") + val controller = new MockController.Builder(). + newInitialTopic("foo", fooId).build() + controller.setActive(false) Review comment: this controller is mock so disabling active works well for this test. However, I did not observe the check of control activity in production code. Could you share that with me? ########## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ########## @@ -541,6 +574,63 @@ static void validateNewTopicNames(Map<String, ApiError> topicErrors, return configChanges; } + Map<String, ResultOrError<Uuid>> findTopicIds(long offset, Collection<String> names) { + Map<String, ResultOrError<Uuid>> results = new HashMap<>(names.size()); + for (String name : names) { + if (name == null) { + results.put(null, new ResultOrError<>(INVALID_REQUEST, "Invalid null topic name.")); + } else { + Uuid id = topicsByName.get(name, offset); + if (id == null) { + results.put(name, new ResultOrError<>( + new ApiError(UNKNOWN_TOPIC_OR_PARTITION))); + } else { + results.put(name, new ResultOrError<>(id)); + } + } + } + return results; + } + + Map<Uuid, ResultOrError<String>> findTopicNames(long offset, Collection<Uuid> ids) { + Map<Uuid, ResultOrError<String>> results = new HashMap<>(ids.size()); + for (Uuid id : ids) { Review comment: Should it check `ZERO_UUID` (use `INVALID_REQUEST` instead of `UNKNOWN_TOPIC_ID`)? ########## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ########## @@ -349,6 +359,29 @@ public void replay(PartitionChangeRecord record) { log.debug("Applied ISR change record: {}", record.toString()); } + public void replay(RemoveTopicRecord record) { + // Remove this topic from the topics map and the topicsByName map. + TopicControlInfo topic = topics.remove(record.topicId()); + if (topic == null) { + throw new RuntimeException("Can't find topic with ID " + record.topicId() + Review comment: Is `UnknownTopicIdException` more suitable? ########## File path: core/src/test/scala/unit/kafka/server/ControllerApisTest.scala ########## @@ -142,6 +148,199 @@ class ControllerApisTest { brokerRegistrationResponse.errorCounts().asScala) } + @Test + def testDeleteTopicsByName(): Unit = { + val fooId = Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q") + val controller = new MockController.Builder().newInitialTopic("foo", fooId).build() + val controllerApis = createControllerApis(None, controller) + val request = new DeleteTopicsRequestData().setTopicNames( + util.Arrays.asList("foo", "bar", "quux", "quux")) + val expectedResponse = Set(new DeletableTopicResult().setName("quux"). + setErrorCode(Errors.INVALID_REQUEST.code()). + setErrorMessage("Duplicate topic name."), + new DeletableTopicResult().setName("bar"). + setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()). + setErrorMessage("This server does not host this topic-partition."), + new DeletableTopicResult().setName("foo").setTopicId(fooId)) + assertEquals(expectedResponse, controllerApis.deleteTopics(request, + ApiKeys.DELETE_TOPICS.latestVersion().toInt, + true, + _ => Set.empty, + _ => Set.empty).asScala.toSet) + } + + @Test + def testDeleteTopicsById(): Unit = { + val fooId = Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q") + val barId = Uuid.fromString("VlFu5c51ToiNx64wtwkhQw") + val quuxId = Uuid.fromString("ObXkLhL_S5W62FAE67U3MQ") + val controller = new MockController.Builder().newInitialTopic("foo", fooId).build() + val controllerApis = createControllerApis(None, controller) + val request = new DeleteTopicsRequestData() + request.topics().add(new DeleteTopicState().setName(null).setTopicId(fooId)) + request.topics().add(new DeleteTopicState().setName(null).setTopicId(barId)) + request.topics().add(new DeleteTopicState().setName(null).setTopicId(quuxId)) + request.topics().add(new DeleteTopicState().setName(null).setTopicId(quuxId)) + val response = Set(new DeletableTopicResult().setName(null).setTopicId(quuxId). + setErrorCode(Errors.INVALID_REQUEST.code()). + setErrorMessage("Duplicate topic id."), + new DeletableTopicResult().setName(null).setTopicId(barId). + setErrorCode(Errors.UNKNOWN_TOPIC_ID.code()). + setErrorMessage("This server does not host this topic ID."), + new DeletableTopicResult().setName("foo").setTopicId(fooId)) + assertEquals(response, controllerApis.deleteTopics(request, + ApiKeys.DELETE_TOPICS.latestVersion().toInt, + true, + _ => Set.empty, + _ => Set.empty).asScala.toSet) + } + + @Test + def testInvalidDeleteTopicsRequest(): Unit = { + val fooId = Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q") + val barId = Uuid.fromString("VlFu5c51ToiNx64wtwkhQw") + val bazId = Uuid.fromString("YOS4oQ3UT9eSAZahN1ysSA") + val controller = new MockController.Builder(). + newInitialTopic("foo", fooId). + newInitialTopic("bar", barId).build() + val controllerApis = createControllerApis(None, controller) + val request = new DeleteTopicsRequestData() + request.topics().add(new DeleteTopicState().setName(null).setTopicId(ZERO_UUID)) + request.topics().add(new DeleteTopicState().setName("foo").setTopicId(fooId)) + request.topics().add(new DeleteTopicState().setName("bar").setTopicId(ZERO_UUID)) + request.topics().add(new DeleteTopicState().setName(null).setTopicId(barId)) + request.topics().add(new DeleteTopicState().setName("quux").setTopicId(ZERO_UUID)) + request.topics().add(new DeleteTopicState().setName("quux").setTopicId(ZERO_UUID)) + request.topics().add(new DeleteTopicState().setName("quux").setTopicId(ZERO_UUID)) + request.topics().add(new DeleteTopicState().setName(null).setTopicId(bazId)) + request.topics().add(new DeleteTopicState().setName(null).setTopicId(bazId)) + request.topics().add(new DeleteTopicState().setName(null).setTopicId(bazId)) + val response = Set(new DeletableTopicResult().setName(null).setTopicId(ZERO_UUID). + setErrorCode(Errors.INVALID_REQUEST.code()). + setErrorMessage("Neither topic name nor id were specified."), + new DeletableTopicResult().setName("foo").setTopicId(fooId). + setErrorCode(Errors.INVALID_REQUEST.code()). + setErrorMessage("You may not specify both topic name and topic id."), + new DeletableTopicResult().setName("bar").setTopicId(barId). + setErrorCode(Errors.INVALID_REQUEST.code()). + setErrorMessage("The provided topic name maps to an ID that was already supplied."), + new DeletableTopicResult().setName("quux").setTopicId(ZERO_UUID). + setErrorCode(Errors.INVALID_REQUEST.code()). + setErrorMessage("Duplicate topic name."), + new DeletableTopicResult().setName(null).setTopicId(bazId). + setErrorCode(Errors.INVALID_REQUEST.code()). + setErrorMessage("Duplicate topic id.")) + assertEquals(response, controllerApis.deleteTopics(request, + ApiKeys.DELETE_TOPICS.latestVersion().toInt, + false, + names => names.toSet, + names => names.toSet).asScala.toSet) + } + + @Test + def testNotAuthorizedToDeleteWithTopicExisting(): Unit = { + val fooId = Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q") + val barId = Uuid.fromString("VlFu5c51ToiNx64wtwkhQw") + val bazId = Uuid.fromString("hr4TVh3YQiu3p16Awkka6w") + val quuxId = Uuid.fromString("5URoQzW_RJiERVZXJgUVLg") + val controller = new MockController.Builder(). + newInitialTopic("foo", fooId). + newInitialTopic("bar", barId). + newInitialTopic("baz", bazId). + newInitialTopic("quux", quuxId).build() + val controllerApis = createControllerApis(None, controller) + val request = new DeleteTopicsRequestData() + request.topics().add(new DeleteTopicState().setName(null).setTopicId(fooId)) + request.topics().add(new DeleteTopicState().setName(null).setTopicId(barId)) + request.topics().add(new DeleteTopicState().setName("baz").setTopicId(ZERO_UUID)) + request.topics().add(new DeleteTopicState().setName("quux").setTopicId(ZERO_UUID)) + val response = Set(new DeletableTopicResult().setName(null).setTopicId(barId). + setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code). + setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message), + new DeletableTopicResult().setName("quux").setTopicId(ZERO_UUID). + setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code). + setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message), + new DeletableTopicResult().setName("baz").setTopicId(ZERO_UUID). + setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code). + setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message), + new DeletableTopicResult().setName("foo").setTopicId(fooId). + setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code). + setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message)) + assertEquals(response, controllerApis.deleteTopics(request, + ApiKeys.DELETE_TOPICS.latestVersion().toInt, + false, + _ => Set("foo", "baz"), + _ => Set.empty).asScala.toSet) + } + + @Test + def testNotAuthorizedToDeleteWithTopicNotExisting(): Unit = { + val barId = Uuid.fromString("VlFu5c51ToiNx64wtwkhQw") + val controller = new MockController.Builder().build() + val props = new Properties() Review comment: This `props` is not used in this test case. ########## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ########## @@ -541,6 +574,63 @@ static void validateNewTopicNames(Map<String, ApiError> topicErrors, return configChanges; } + Map<String, ResultOrError<Uuid>> findTopicIds(long offset, Collection<String> names) { + Map<String, ResultOrError<Uuid>> results = new HashMap<>(names.size()); + for (String name : names) { + if (name == null) { + results.put(null, new ResultOrError<>(INVALID_REQUEST, "Invalid null topic name.")); + } else { + Uuid id = topicsByName.get(name, offset); + if (id == null) { + results.put(name, new ResultOrError<>( + new ApiError(UNKNOWN_TOPIC_OR_PARTITION))); + } else { + results.put(name, new ResultOrError<>(id)); + } + } + } + return results; + } + + Map<Uuid, ResultOrError<String>> findTopicNames(long offset, Collection<Uuid> ids) { + Map<Uuid, ResultOrError<String>> results = new HashMap<>(ids.size()); + for (Uuid id : ids) { + TopicControlInfo topic = topics.get(id, offset); + if (topic == null) { + results.put(id, new ResultOrError<>(new ApiError(UNKNOWN_TOPIC_ID))); + } else { + results.put(id, new ResultOrError<>(topic.name)); + } + } + return results; + } + + ControllerResult<Map<Uuid, ApiError>> deleteTopics(Collection<Uuid> ids) { + Map<Uuid, ApiError> results = new HashMap<>(ids.size()); + List<ApiMessageAndVersion> records = new ArrayList<>(); Review comment: How about setting initial size of `records`? `new ArrayList<>(ids.size())` ########## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ########## @@ -72,6 +76,9 @@ import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET; import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC; +import static org.apache.kafka.common.protocol.Errors.INVALID_REQUEST; Review comment: At this static member is imported, we can replace all `Errors.INVALID_REQUEST` by `INVALID_REQUEST` in this class. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org