dengziming commented on a change in pull request #9684:
URL: https://github.com/apache/kafka/pull/9684#discussion_r550836862
##########
File path: core/src/main/scala/kafka/zk/KafkaZkClient.scala
##########
@@ -621,10 +621,10 @@ class KafkaZkClient private[zk] (zooKeeperClient:
ZooKeeperClient, isSecure: Boo
case Code.NONODE => None
case _ => throw getDataResponse.resultException.get
}
- }.map(_.get)
- .map(topicIdAssignment => (topicIdAssignment.topic,
- topicIdAssignment.topicId.getOrElse(
- throw new IllegalStateException("Topic " + topicIdAssignment.topic +
" does not have a topic ID."))))
+ }.filter(_.isDefined)
Review comment:
nit: getDataResponses.map(xxx).filter(_.isDefined).map(_.get) can be
replaced by getDataResponses.flatMap(xxx)
##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1981,29 +1986,39 @@ class KafkaApis(val requestChannel: RequestChannel,
val results = new
DeletableTopicResultCollection(deleteTopicRequest.data.topicNames.size)
val toDelete = mutable.Set[String]()
if (!controller.isActive) {
- deleteTopicRequest.data.topicNames.forEach { topic =>
+ deleteTopicRequest.topics().forEach { topic =>
results.add(new DeletableTopicResult()
- .setName(topic)
+ .setName(topic.name())
+ .setTopicId(topic.topicId())
.setErrorCode(Errors.NOT_CONTROLLER.code))
}
sendResponseCallback(results)
} else if (!config.deleteTopicEnable) {
val error = if (request.context.apiVersion < 3) Errors.INVALID_REQUEST
else Errors.TOPIC_DELETION_DISABLED
- deleteTopicRequest.data.topicNames.forEach { topic =>
+ deleteTopicRequest.topics().forEach { topic =>
results.add(new DeletableTopicResult()
- .setName(topic)
+ .setName(topic.name())
+ .setTopicId(topic.topicId())
.setErrorCode(error.code))
}
sendResponseCallback(results)
} else {
- deleteTopicRequest.data.topicNames.forEach { topic =>
+ deleteTopicRequest.topics().forEach { topic =>
+ val name = if (topic.name() != null) topic.name()
Review comment:
According to the kip, we will use topicId firstly and topic name will be
used only if topicId is ZERO, but you just use the topic name even topicId is
not ZERO here?
##########
File path: clients/src/main/resources/common/message/DeleteTopicsRequest.json
##########
@@ -23,10 +23,17 @@
//
// Version 5 adds ErrorMessage in the response and may return a
THROTTLING_QUOTA_EXCEEDED error
// in the response if the topics deletion is throttled (KIP-599).
- "validVersions": "0-5",
+ //
+ // Version 6 reorganizes topics, adds topic IDs and allows topic names to be
null.
+ "validVersions": "0-6",
"flexibleVersions": "4+",
"fields": [
- { "name": "TopicNames", "type": "[]string", "versions": "0+",
"entityType": "topicName",
+ { "name": "Topics", "type": "[]DeleteTopicState", "versions": "6+",
"about": "The name or topic ID of the topic",
+ "fields": [
+ {"name": "Name", "type": "string", "versions": "6+",
"nullableVersions": "6+", "about": "The topic name"},
Review comment:
Why is topic name nullable but topicId not? do you mean we will fill all
delete requests with a topicId before sending a request?
##########
File path:
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -1694,6 +1722,79 @@ void handleFailure(Throwable throwable) {
}
};
}
+
+ private Call getDeleteTopicsWithIdsCall(final DeleteTopicsOptions options,
+ final Map<Uuid, KafkaFutureImpl<Void>>
futures,
+ final List<Uuid> topicIds,
+ final Map<Uuid,
ThrottlingQuotaExceededException> quotaExceededExceptions,
+ final long now,
+ final long deadline) {
+ return new Call("deleteTopics", deadline, new
ControllerNodeProvider()) {
+ @Override
+ DeleteTopicsRequest.Builder createRequest(int timeoutMs) {
+ return new DeleteTopicsRequest.Builder(
+ new DeleteTopicsRequestData()
+ .setTopics(topicIds.stream().map(
+ topic -> new
DeleteTopicState().setTopicId(topic)).collect(Collectors.toList()))
+ .setTimeoutMs(timeoutMs));
+ }
+
+ @Override
+ void handleResponse(AbstractResponse abstractResponse) {
+ // Check for controller change
+ handleNotControllerError(abstractResponse);
+ // Handle server responses for particular topics.
+ final DeleteTopicsResponse response = (DeleteTopicsResponse)
abstractResponse;
+ final List<Uuid> retryTopics = new ArrayList<>();
+ final Map<Uuid, ThrottlingQuotaExceededException>
retryTopicQuotaExceededExceptions = new HashMap<>();
+ for (DeletableTopicResult result :
response.data().responses()) {
+ KafkaFutureImpl<Void> future =
futures.get(result.topicId());
+ if (future == null) {
+ log.warn("Server response mentioned unknown topic {}",
result.name());
Review comment:
here we'd better use result.topicId() since we are using topicId.
##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1896,6 +1896,11 @@ class KafkaApis(val requestChannel: RequestChannel,
.setTopicConfigErrorCode(Errors.NONE.code)
}
}
+ val topicIds =
zkClient.getTopicIdsForTopics(results.asScala.map(result =>
result.name()).toSet)
Review comment:
Here could we use controllerContext since createTopicRep is handled at
the active controller.
##########
File path:
clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
##########
@@ -39,8 +45,25 @@ public Builder(DeleteTopicsRequestData data) {
@Override
public DeleteTopicsRequest build(short version) {
+ if (version >= 6 && data.topicNames().size() != 0) {
+ data.setTopics(groupByTopic(data.topicNames()));
+ } else if (version >= 6) {
+ for (DeleteTopicState topic : data.topics()) {
+ if (topic.name().equals("")) {
Review comment:
if topic.name() == null, a NEP will throw here.
##########
File path:
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -1623,6 +1625,32 @@ public DeleteTopicsResult deleteTopics(final
Collection<String> topicNames,
return new DeleteTopicsResult(new HashMap<>(topicFutures));
}
+ @Override
+ public DeleteTopicsWithIdsResult deleteTopicsWithIds(final
Collection<Uuid> topicIds,
+ final DeleteTopicsOptions options) {
+ final Map<Uuid, KafkaFutureImpl<Void>> topicFutures = new
HashMap<>(topicIds.size());
+ final List<Uuid> validTopicIds = new ArrayList<>(topicIds.size());
+ for (Uuid topicId : topicIds) {
+ if (topicId.equals(Uuid.ZERO_UUID)) {
+ KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
+ future.completeExceptionally(new InvalidTopicException("The
given topic ID '" +
Review comment:
Should this be UnknownTopicIdException?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]