[
https://issues.apache.org/jira/browse/KAFKA-5975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16591815#comment-16591815
]
ASF GitHub Bot commented on KAFKA-5975:
---------------------------------------
hachikuji closed pull request #3960: KAFKA-5975: No response when deleting
topics and delete.topic.enable=false
URL: https://github.com/apache/kafka/pull/3960
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/clients/src/main/java/org/apache/kafka/common/errors/TopicDeletionDisabledException.java
b/clients/src/main/java/org/apache/kafka/common/errors/TopicDeletionDisabledException.java
new file mode 100644
index 00000000000..41577d2a288
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/common/errors/TopicDeletionDisabledException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.errors;
+
+public class TopicDeletionDisabledException extends ApiException {
+ private static final long serialVersionUID = 1L;
+
+ public TopicDeletionDisabledException() {
+ }
+
+ public TopicDeletionDisabledException(String message) {
+ super(message);
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 090dca32651..d4610602d1e 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -80,6 +80,7 @@
import org.apache.kafka.common.errors.SecurityDisabledException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.errors.TopicDeletionDisabledException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
import org.apache.kafka.common.errors.TransactionCoordinatorFencedException;
@@ -110,7 +111,7 @@
* Do not add exceptions that occur only on the client or only on the server
here.
*/
public enum Errors {
- UNKNOWN_SERVER_ERROR(-1, "The server experienced an unexpected error when
processing the request",
+ UNKNOWN_SERVER_ERROR(-1, "The server experienced an unexpected error when
processing the request,",
UnknownServerException::new),
NONE(0, null, message -> null),
OFFSET_OUT_OF_RANGE(1, "The requested offset is not within the range of
offsets maintained by the server.",
@@ -129,7 +130,7 @@
TimeoutException::new),
BROKER_NOT_AVAILABLE(8, "The broker is not available.",
BrokerNotAvailableException::new),
- REPLICA_NOT_AVAILABLE(9, "The replica is not available for the requested
topic-partition",
+ REPLICA_NOT_AVAILABLE(9, "The replica is not available for the requested
topic-partition.",
ReplicaNotAvailableException::new),
MESSAGE_TOO_LARGE(10, "The request included a message larger than the max
message size the server will accept.",
RecordTooLargeException::new),
@@ -161,7 +162,7 @@
"The group member's supported protocols are incompatible with
those of existing members" +
" or first group member tried to join with empty protocol type
or empty protocol list.",
InconsistentGroupProtocolException::new),
- INVALID_GROUP_ID(24, "The configured groupId is invalid",
+ INVALID_GROUP_ID(24, "The configured groupId is invalid.",
InvalidGroupIdException::new),
UNKNOWN_MEMBER_ID(25, "The coordinator is not aware of this member.",
UnknownMemberIdException::new),
@@ -171,7 +172,7 @@
InvalidSessionTimeoutException::new),
REBALANCE_IN_PROGRESS(27, "The group is rebalancing, so a rejoin is
needed.",
RebalanceInProgressException::new),
- INVALID_COMMIT_OFFSET_SIZE(28, "The committing offset data size is not
valid",
+ INVALID_COMMIT_OFFSET_SIZE(28, "The committing offset data size is not
valid.",
InvalidCommitOffsetSizeException::new),
TOPIC_AUTHORIZATION_FAILED(29, "Topic authorization failed.",
TopicAuthorizationException::new),
@@ -207,28 +208,28 @@
UnsupportedForMessageFormatException::new),
POLICY_VIOLATION(44, "Request parameters do not satisfy the configured
policy.",
PolicyViolationException::new),
- OUT_OF_ORDER_SEQUENCE_NUMBER(45, "The broker received an out of order
sequence number",
+ OUT_OF_ORDER_SEQUENCE_NUMBER(45, "The broker received an out of order
sequence number.",
OutOfOrderSequenceException::new),
- DUPLICATE_SEQUENCE_NUMBER(46, "The broker received a duplicate sequence
number",
+ DUPLICATE_SEQUENCE_NUMBER(46, "The broker received a duplicate sequence
number.",
DuplicateSequenceException::new),
INVALID_PRODUCER_EPOCH(47, "Producer attempted an operation with an old
epoch. Either there is a newer producer " +
"with the same transactionalId, or the producer's transaction has
been expired by the broker.",
ProducerFencedException::new),
- INVALID_TXN_STATE(48, "The producer attempted a transactional operation in
an invalid state",
+ INVALID_TXN_STATE(48, "The producer attempted a transactional operation in
an invalid state.",
InvalidTxnStateException::new),
INVALID_PRODUCER_ID_MAPPING(49, "The producer attempted to use a producer
id which is not currently assigned to " +
- "its transactional id",
+ "its transactional id.",
InvalidPidMappingException::new),
INVALID_TRANSACTION_TIMEOUT(50, "The transaction timeout is larger than
the maximum value allowed by " +
"the broker (as configured by transaction.max.timeout.ms).",
InvalidTxnTimeoutException::new),
CONCURRENT_TRANSACTIONS(51, "The producer attempted to update a
transaction " +
- "while another concurrent operation on the same transaction
was ongoing",
+ "while another concurrent operation on the same transaction
was ongoing.",
ConcurrentTransactionsException::new),
TRANSACTION_COORDINATOR_FENCED(52, "Indicates that the transaction
coordinator sending a WriteTxnMarker " +
- "is no longer the current coordinator for a given producer",
+ "is no longer the current coordinator for a given producer.",
TransactionCoordinatorFencedException::new),
- TRANSACTIONAL_ID_AUTHORIZATION_FAILED(53, "Transactional Id authorization
failed",
+ TRANSACTIONAL_ID_AUTHORIZATION_FAILED(53, "Transactional Id authorization
failed.",
TransactionalIdAuthorizationException::new),
SECURITY_DISABLED(54, "Security features are disabled.",
SecurityDisabledException::new),
@@ -248,7 +249,7 @@
"removed, the producer's metadata is removed from the broker, and
future appends by the producer will " +
"return this exception.",
UnknownProducerIdException::new),
- REASSIGNMENT_IN_PROGRESS(60, "A partition reassignment is in progress",
+ REASSIGNMENT_IN_PROGRESS(60, "A partition reassignment is in progress.",
ReassignmentInProgressException::new),
DELEGATION_TOKEN_AUTH_DISABLED(61, "Delegation Token feature is not
enabled.",
DelegationTokenDisabledException::new),
@@ -263,19 +264,21 @@
DelegationTokenAuthorizationException::new),
DELEGATION_TOKEN_EXPIRED(66, "Delegation Token is expired.",
DelegationTokenExpiredException::new),
- INVALID_PRINCIPAL_TYPE(67, "Supplied principalType is not supported",
+ INVALID_PRINCIPAL_TYPE(67, "Supplied principalType is not supported.",
InvalidPrincipalTypeException::new),
- NON_EMPTY_GROUP(68, "The group is not empty",
+ NON_EMPTY_GROUP(68, "The group is not empty.",
GroupNotEmptyException::new),
- GROUP_ID_NOT_FOUND(69, "The group id does not exist",
+ GROUP_ID_NOT_FOUND(69, "The group id does not exist.",
GroupIdNotFoundException::new),
- FETCH_SESSION_ID_NOT_FOUND(70, "The fetch session ID was not found",
+ FETCH_SESSION_ID_NOT_FOUND(70, "The fetch session ID was not found.",
FetchSessionIdNotFoundException::new),
- INVALID_FETCH_SESSION_EPOCH(71, "The fetch session epoch is invalid",
+ INVALID_FETCH_SESSION_EPOCH(71, "The fetch session epoch is invalid.",
InvalidFetchSessionEpochException::new),
LISTENER_NOT_FOUND(72, "There is no listener on the leader broker that
matches the listener on which " +
- "metadata request was processed",
- ListenerNotFoundException::new),;
+ "metadata request was processed.",
+ ListenerNotFoundException::new),
+ TOPIC_DELETION_DISABLED(73, "Topic deletion is disabled.",
+ TopicDeletionDisabledException::new);
private static final Logger log = LoggerFactory.getLogger(Errors.class);
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
index dbcc25d37c6..87f14b4aa63 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
@@ -51,9 +51,15 @@
*/
private static final Schema DELETE_TOPICS_REQUEST_V2 =
DELETE_TOPICS_REQUEST_V1;
+ /**
+ * v3 request is the same that as v2. The response is different based on
the request version.
+ * In v3 version a TopicDeletionDisabledException is returned
+ */
+ private static final Schema DELETE_TOPICS_REQUEST_V3 =
DELETE_TOPICS_REQUEST_V2;
+
public static Schema[] schemaVersions() {
return new Schema[]{DELETE_TOPICS_REQUEST_V0, DELETE_TOPICS_REQUEST_V1,
- DELETE_TOPICS_REQUEST_V2};
+ DELETE_TOPICS_REQUEST_V2, DELETE_TOPICS_REQUEST_V3};
}
private final Set<String> topics;
@@ -121,6 +127,7 @@ public AbstractResponse getErrorResponse(int
throttleTimeMs, Throwable e) {
return new DeleteTopicsResponse(topicErrors);
case 1:
case 2:
+ case 3:
return new DeleteTopicsResponse(throttleTimeMs, topicErrors);
default:
throw new IllegalArgumentException(String.format("Version %d
is not valid. Valid versions for %s are 0 to %d",
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
index db1c4343f9e..650caa829ab 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
@@ -53,8 +53,15 @@
*/
private static final Schema DELETE_TOPICS_RESPONSE_V2 =
DELETE_TOPICS_RESPONSE_V1;
+ /**
+ * v3 request is the same that as v2. The response is different based on
the request version.
+ * In v3 version a TopicDeletionDisabledException is returned
+ */
+ private static final Schema DELETE_TOPICS_RESPONSE_V3 =
DELETE_TOPICS_RESPONSE_V2;
+
public static Schema[] schemaVersions() {
- return new Schema[]{DELETE_TOPICS_RESPONSE_V0,
DELETE_TOPICS_RESPONSE_V1, DELETE_TOPICS_RESPONSE_V2};
+ return new Schema[]{DELETE_TOPICS_RESPONSE_V0,
DELETE_TOPICS_RESPONSE_V1,
+ DELETE_TOPICS_RESPONSE_V2, DELETE_TOPICS_RESPONSE_V3};
}
@@ -65,6 +72,8 @@
* INVALID_TOPIC_EXCEPTION(17)
* TOPIC_AUTHORIZATION_FAILED(29)
* NOT_CONTROLLER(41)
+ * INVALID_REQUEST(42)
+ * TOPIC_DELETION_DISABLED(73)
*/
private final Map<String, Errors> errors;
private final int throttleTimeMs;
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 70794714026..0245cbd3695 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -45,6 +45,7 @@
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.errors.SecurityDisabledException;
import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicDeletionDisabledException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
@@ -58,6 +59,8 @@
import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse;
import org.apache.kafka.common.requests.DeleteGroupsResponse;
import org.apache.kafka.common.requests.DeleteRecordsResponse;
+import org.apache.kafka.common.requests.DeleteTopicsRequest;
+import org.apache.kafka.common.requests.DeleteTopicsResponse;
import org.apache.kafka.common.requests.DescribeAclsResponse;
import org.apache.kafka.common.requests.DescribeConfigsResponse;
import org.apache.kafka.common.requests.DescribeGroupsResponse;
@@ -378,6 +381,32 @@ public void testCreateTopicsHandleNotControllerException()
throws Exception {
}
}
+ @Test
+ public void testDeleteTopics() throws Exception {
+ try (AdminClientUnitTestEnv env = mockClientEnv()) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+ env.kafkaClient().setNode(env.cluster().controller());
+
+ env.kafkaClient().prepareResponse(body -> body instanceof
DeleteTopicsRequest,
+ new
DeleteTopicsResponse(Collections.singletonMap("myTopic", Errors.NONE)));
+ KafkaFuture<Void> future =
env.adminClient().deleteTopics(Collections.singletonList("myTopic"),
+ new DeleteTopicsOptions()).all();
+ future.get();
+
+ env.kafkaClient().prepareResponse(body -> body instanceof
DeleteTopicsRequest,
+ new
DeleteTopicsResponse(Collections.singletonMap("myTopic",
Errors.TOPIC_DELETION_DISABLED)));
+ future =
env.adminClient().deleteTopics(Collections.singletonList("myTopic"),
+ new DeleteTopicsOptions()).all();
+ TestUtils.assertFutureError(future,
TopicDeletionDisabledException.class);
+
+ env.kafkaClient().prepareResponse(body -> body instanceof
DeleteTopicsRequest,
+ new
DeleteTopicsResponse(Collections.singletonMap("myTopic",
Errors.UNKNOWN_TOPIC_OR_PARTITION)));
+ future =
env.adminClient().deleteTopics(Collections.singletonList("myTopic"),
+ new DeleteTopicsOptions()).all();
+ TestUtils.assertFutureError(future,
UnknownTopicOrPartitionException.class);
+ }
+ }
+
@Test
public void testInvalidTopicNames() throws Exception {
try (AdminClientUnitTestEnv env = mockClientEnv()) {
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 0273e3da557..096ff388bdd 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1539,6 +1539,12 @@ class KafkaApis(val requestChannel: RequestChannel,
(topic, Errors.NOT_CONTROLLER)
}.toMap
sendResponseCallback(results)
+ } else if (!config.deleteTopicEnable) {
+ val error = if (request.context.apiVersion < 3) Errors.INVALID_REQUEST
else Errors.TOPIC_DELETION_DISABLED
+ val results = deleteTopicRequest.topics.asScala.map { topic =>
+ (topic, error)
+ }.toMap
+ sendResponseCallback(results)
} else {
// If no authorized topics return immediately
if (authorizedForDeleteTopics.isEmpty)
diff --git
a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestWithDeletionDisabledTest.scala
b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestWithDeletionDisabledTest.scala
new file mode 100644
index 00000000000..20e30c08a9f
--- /dev/null
+++
b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestWithDeletionDisabledTest.scala
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.network.SocketServer
+import kafka.utils._
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.requests.{DeleteTopicsRequest,
DeleteTopicsResponse}
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.JavaConverters._
+
+class DeleteTopicsRequestWithDeletionDisabledTest extends BaseRequestTest {
+
+ override def numBrokers: Int = 1
+
+ override def generateConfigs = {
+ val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect,
+ enableControlledShutdown = false, enableDeleteTopic = false,
+ interBrokerSecurityProtocol = Some(securityProtocol),
+ trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties,
logDirCount = logDirCount)
+ props.foreach(propertyOverrides)
+ props.map(KafkaConfig.fromProps)
+ }
+
+ @Test
+ def testDeleteRecordsRequest() {
+ val topic = "topic-1"
+ val request = new DeleteTopicsRequest.Builder(Set(topic).asJava,
1000).build()
+ val response = sendDeleteTopicsRequest(request)
+ assertEquals(Errors.TOPIC_DELETION_DISABLED, response.errors.get(topic))
+
+ val v2request = new DeleteTopicsRequest.Builder(Set(topic).asJava,
1000).build(2)
+ val v2response = sendDeleteTopicsRequest(v2request)
+ assertEquals(Errors.INVALID_REQUEST, v2response.errors.get(topic))
+ }
+
+ private def sendDeleteTopicsRequest(request: DeleteTopicsRequest,
socketServer: SocketServer = controllerSocketServer): DeleteTopicsResponse = {
+ val response = connectAndSend(request, ApiKeys.DELETE_TOPICS, socketServer)
+ DeleteTopicsResponse.parse(response, request.version)
+ }
+
+}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> No response when deleting topics and delete.topic.enable=false
> --------------------------------------------------------------
>
> Key: KAFKA-5975
> URL: https://issues.apache.org/jira/browse/KAFKA-5975
> Project: Kafka
> Issue Type: Bug
> Components: core
> Affects Versions: 0.11.0.0
> Reporter: Mario Molina
> Assignee: Mario Molina
> Priority: Minor
> Fix For: 2.1.0
>
>
> When trying to delete topics using the KafkaAdminClient and the flag in
> server config is set as 'delete.topic.enable=false', the client cannot get a
> response and fails returning a timeout error. This is due to the object
> DelayedCreatePartitions cannot complete the operation.
> This bug fix modifies the KafkaApi key DELETE_TOPICS taking into account that
> the flag can be disabled and swallow the error to the client, this is, the
> topic is never removed and no error is returned to the client.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)