hachikuji commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r587670430



##########
File path: core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
##########
@@ -142,6 +148,199 @@ class ControllerApisTest {
       brokerRegistrationResponse.errorCounts().asScala)
   }
 
+  @Test
+  def testDeleteTopicsByName(): Unit = {
+    val fooId = Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")
+    val controller = new MockController.Builder().newInitialTopic("foo", 
fooId).build()
+    val controllerApis = createControllerApis(None, controller)
+    val request = new DeleteTopicsRequestData().setTopicNames(
+      util.Arrays.asList("foo", "bar", "quux", "quux"))
+    val expectedResponse = Set(new DeletableTopicResult().setName("quux").
+        setErrorCode(Errors.INVALID_REQUEST.code()).
+        setErrorMessage("Duplicate topic name."),
+      new DeletableTopicResult().setName("bar").
+        setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()).
+        setErrorMessage("This server does not host this topic-partition."),
+      new DeletableTopicResult().setName("foo").setTopicId(fooId))
+    assertEquals(expectedResponse, controllerApis.deleteTopics(request,
+      ApiKeys.DELETE_TOPICS.latestVersion().toInt,
+      true,
+      _ => Set.empty,
+      _ => Set.empty).asScala.toSet)
+  }
+
+  @Test
+  def testDeleteTopicsById(): Unit = {
+    val fooId = Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")
+    val barId = Uuid.fromString("VlFu5c51ToiNx64wtwkhQw")
+    val quuxId = Uuid.fromString("ObXkLhL_S5W62FAE67U3MQ")
+    val controller = new MockController.Builder().newInitialTopic("foo", 
fooId).build()
+    val controllerApis = createControllerApis(None, controller)
+    val request = new DeleteTopicsRequestData()
+    request.topics().add(new 
DeleteTopicState().setName(null).setTopicId(fooId))
+    request.topics().add(new 
DeleteTopicState().setName(null).setTopicId(barId))
+    request.topics().add(new 
DeleteTopicState().setName(null).setTopicId(quuxId))
+    request.topics().add(new 
DeleteTopicState().setName(null).setTopicId(quuxId))
+    val response = Set(new 
DeletableTopicResult().setName(null).setTopicId(quuxId).
+        setErrorCode(Errors.INVALID_REQUEST.code()).
+        setErrorMessage("Duplicate topic id."),
+      new DeletableTopicResult().setName(null).setTopicId(barId).
+        setErrorCode(Errors.UNKNOWN_TOPIC_ID.code()).
+        setErrorMessage("This server does not host this topic ID."),
+      new DeletableTopicResult().setName("foo").setTopicId(fooId))
+    assertEquals(response, controllerApis.deleteTopics(request,
+      ApiKeys.DELETE_TOPICS.latestVersion().toInt,
+      true,
+      _ => Set.empty,
+      _ => Set.empty).asScala.toSet)
+  }
+
+  @Test
+  def testInvalidDeleteTopicsRequest(): Unit = {
+    val fooId = Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")
+    val barId = Uuid.fromString("VlFu5c51ToiNx64wtwkhQw")
+    val bazId = Uuid.fromString("YOS4oQ3UT9eSAZahN1ysSA")
+    val controller = new MockController.Builder().
+      newInitialTopic("foo", fooId).
+      newInitialTopic("bar", barId).build()
+    val controllerApis = createControllerApis(None, controller)
+    val request = new DeleteTopicsRequestData()
+    request.topics().add(new 
DeleteTopicState().setName(null).setTopicId(ZERO_UUID))
+    request.topics().add(new 
DeleteTopicState().setName("foo").setTopicId(fooId))
+    request.topics().add(new 
DeleteTopicState().setName("bar").setTopicId(ZERO_UUID))
+    request.topics().add(new 
DeleteTopicState().setName(null).setTopicId(barId))
+    request.topics().add(new 
DeleteTopicState().setName("quux").setTopicId(ZERO_UUID))
+    request.topics().add(new 
DeleteTopicState().setName("quux").setTopicId(ZERO_UUID))
+    request.topics().add(new 
DeleteTopicState().setName("quux").setTopicId(ZERO_UUID))
+    request.topics().add(new 
DeleteTopicState().setName(null).setTopicId(bazId))
+    request.topics().add(new 
DeleteTopicState().setName(null).setTopicId(bazId))
+    request.topics().add(new 
DeleteTopicState().setName(null).setTopicId(bazId))
+    val response = Set(new 
DeletableTopicResult().setName(null).setTopicId(ZERO_UUID).
+        setErrorCode(Errors.INVALID_REQUEST.code()).
+        setErrorMessage("Neither topic name nor id were specified."),
+      new DeletableTopicResult().setName("foo").setTopicId(fooId).
+        setErrorCode(Errors.INVALID_REQUEST.code()).
+        setErrorMessage("You may not specify both topic name and topic id."),
+      new DeletableTopicResult().setName("bar").setTopicId(barId).
+        setErrorCode(Errors.INVALID_REQUEST.code()).
+        setErrorMessage("The provided topic name maps to an ID that was 
already supplied."),
+      new DeletableTopicResult().setName("quux").setTopicId(ZERO_UUID).
+        setErrorCode(Errors.INVALID_REQUEST.code()).
+        setErrorMessage("Duplicate topic name."),
+      new DeletableTopicResult().setName(null).setTopicId(bazId).
+        setErrorCode(Errors.INVALID_REQUEST.code()).
+        setErrorMessage("Duplicate topic id."))
+    assertEquals(response, controllerApis.deleteTopics(request,
+      ApiKeys.DELETE_TOPICS.latestVersion().toInt,
+      false,
+      names => names.toSet,
+      names => names.toSet).asScala.toSet)
+  }
+
+  @Test
+  def testNotAuthorizedToDeleteWithTopicExisting(): Unit = {
+    val fooId = Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")
+    val barId = Uuid.fromString("VlFu5c51ToiNx64wtwkhQw")
+    val bazId = Uuid.fromString("hr4TVh3YQiu3p16Awkka6w")
+    val quuxId = Uuid.fromString("5URoQzW_RJiERVZXJgUVLg")
+    val controller = new MockController.Builder().
+      newInitialTopic("foo", fooId).
+      newInitialTopic("bar", barId).
+      newInitialTopic("baz", bazId).
+      newInitialTopic("quux", quuxId).build()
+    val controllerApis = createControllerApis(None, controller)
+    val request = new DeleteTopicsRequestData()
+    request.topics().add(new 
DeleteTopicState().setName(null).setTopicId(fooId))
+    request.topics().add(new 
DeleteTopicState().setName(null).setTopicId(barId))
+    request.topics().add(new 
DeleteTopicState().setName("baz").setTopicId(ZERO_UUID))
+    request.topics().add(new 
DeleteTopicState().setName("quux").setTopicId(ZERO_UUID))
+    val response = Set(new 
DeletableTopicResult().setName(null).setTopicId(barId).
+        setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code).
+        setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message),
+      new DeletableTopicResult().setName("quux").setTopicId(ZERO_UUID).
+        setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code).
+        setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message),
+      new DeletableTopicResult().setName("baz").setTopicId(ZERO_UUID).
+        setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code).
+        setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message),
+      new DeletableTopicResult().setName("foo").setTopicId(fooId).
+        setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code).
+        setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message))
+    assertEquals(response, controllerApis.deleteTopics(request,
+      ApiKeys.DELETE_TOPICS.latestVersion().toInt,
+      false,
+      _ => Set("foo", "baz"),
+      _ => Set.empty).asScala.toSet)
+  }
+
+  @Test
+  def testNotAuthorizedToDeleteWithTopicNotExisting(): Unit = {
+    val barId = Uuid.fromString("VlFu5c51ToiNx64wtwkhQw")
+    val controller = new MockController.Builder().build()
+    val props = new Properties()
+    props.put(KafkaConfig.DeleteTopicEnableProp, "false")
+    val controllerApis = createControllerApis(None, controller)
+    val request = new DeleteTopicsRequestData()
+    request.topics().add(new 
DeleteTopicState().setName("foo").setTopicId(ZERO_UUID))
+    request.topics().add(new 
DeleteTopicState().setName("bar").setTopicId(ZERO_UUID))
+    request.topics().add(new 
DeleteTopicState().setName(null).setTopicId(barId))
+    val expectedResponse = Set(new DeletableTopicResult().setName("foo").
+        setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code).
+        setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message),
+      new DeletableTopicResult().setName("bar").
+        setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code).
+        setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message),
+      new DeletableTopicResult().setName(null).setTopicId(barId).
+        setErrorCode(Errors.UNKNOWN_TOPIC_ID.code).
+        setErrorMessage(Errors.UNKNOWN_TOPIC_ID.message))
+    assertEquals(expectedResponse, controllerApis.deleteTopics(request,
+      ApiKeys.DELETE_TOPICS.latestVersion().toInt,
+      false,
+      _ => Set("foo"),
+      _ => Set.empty).asScala.toSet)
+  }
+
+  @Test
+  def testNotControllerErrorPreventsDeletingTopics(): Unit = {
+    val fooId = Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")
+    val barId = Uuid.fromString("VlFu5c51ToiNx64wtwkhQw")
+    val controller = new MockController.Builder().
+      newInitialTopic("foo", fooId).build()
+    controller.setActive(false)

Review comment:
       See `QuorumController.QuorumMetaLogListener` for the callbacks that make 
the controller active or inactive.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to