[GitHub] [kafka] satishd commented on a change in pull request #10173: KAFKA-9548 Added SPIs and public classes/interfaces introduced in KIP-405 for tiered storage feature in Kafka.

2021-03-01 Thread GitBox


satishd commented on a change in pull request #10173:
URL: https://github.com/apache/kafka/pull/10173#discussion_r585333787



##
File path: 
clients/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java
##
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+/**
+ * It describes the metadata about a topic partition's remote log segment in 
the remote storage. This is uniquely
+ * represented with {@link RemoteLogSegmentId}.
+ * 
+ * New instance is always created with the state as {@link 
RemoteLogSegmentState#COPY_SEGMENT_STARTED}. This can be
+ * updated by applying {@link RemoteLogSegmentMetadataUpdate} for the 
respective {@link RemoteLogSegmentId} of the
+ * {@code RemoteLogSegmentMetadata}.
+ */
+@InterfaceStability.Evolving
+public class RemoteLogSegmentMetadata implements Serializable {

Review comment:
   No, this is not strictly needed. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on a change in pull request #10173: KAFKA-9548 Added SPIs and public classes/interfaces introduced in KIP-405 for tiered storage feature in Kafka.

2021-03-01 Thread GitBox


satishd commented on a change in pull request #10173:
URL: https://github.com/apache/kafka/pull/10173#discussion_r585282329



##
File path: 
clients/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java
##
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.io.Closeable;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * This interface provides storing and fetching remote log segment metadata 
with strongly consistent semantics.
+ * 
+ * This class can be plugged in to Kafka cluster by adding the implementation 
class as
+ * remote.log.metadata.manager.class.name property value. There 
is an inbuilt implementation backed by
+ * topic storage in the local cluster. This is used as the default 
implementation if
+ * remote.log.metadata.manager.class.name is not configured.
+ * 
+ * 
+ * remote.log.metadata.manager.class.path property is about the 
class path of the RemoteLogStorageManager
+ * implementation. If specified, the RemoteLogStorageManager implementation 
and its dependent libraries will be loaded
+ * by a dedicated classloader which searches this class path before the Kafka 
broker class path. The syntax of this
+ * parameter is same with the standard Java class path string.
+ * 
+ * 
+ * remote.log.metadata.manager.listener.name property is about 
listener name of the local broker to which
+ * it should get connected if needed by RemoteLogMetadataManager 
implementation. When this is configured all other
+ * required properties can be passed as properties with prefix of 
'remote.log.metadata.manager.listener.
+ * 
+ * "cluster.id", "broker.id" and all other properties prefixed with 
"remote.log.metadata." are passed when
+ * {@link #configure(Map)} is invoked on this instance.
+ * 
+ */
+@InterfaceStability.Evolving
+public interface RemoteLogMetadataManager extends Configurable, Closeable {
+
+/**
+ * Stores {@link }RemoteLogSegmentMetadata} with the containing {@link 
}RemoteLogSegmentId} into {@link RemoteLogMetadataManager}.
+ * 
+ * RemoteLogSegmentMetadata is identified by RemoteLogSegmentId.
+ *
+ * @param remoteLogSegmentMetadata metadata about the remote log segment.
+ * @throws RemoteStorageException if there are any storage related errors 
occurred.
+ */
+void putRemoteLogSegmentMetadata(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata) throws RemoteStorageException;
+
+/**
+ * This method is used to update the {@link RemoteLogSegmentMetadata}. 
Currently, it allows to update with the new
+ * state based on the life cycle of the segment. It can go through the 
below state transitions.
+ * 
+ * 
+ * +-++--+
+ * |COPY_SEGMENT_STARTED |--->|COPY_SEGMENT_FINISHED |
+ * +---+-++--+---+
+ * | |
+ * | |
+ * v v
+ *  +--+-+-+
+ *  |DELETE_SEGMENT_STARTED|
+ *  +---+--+
+ *  |
+ *  |
+ *  v
+ *  +---+---+
+ *  |DELETE_SEGMENT_FINISHED|
+ *  +---+
+ * 
+ * 
+ * {@link RemoteLogSegmentState#COPY_SEGMENT_STARTED} - This state 
indicates that the segment copying to remote storage is started but not yet 
finished.
+ * {@link RemoteLogSegmentState#COPY_SEGMENT_FINISHED} - This state 
indicates that the segment copying to remote storage is finished.
+ * 
+ * The leader broker copies the log segments to the remote storage and 
puts the remote log segment metadata with the
+ * state as 

[GitHub] [kafka] satishd commented on a change in pull request #10173: KAFKA-9548 Added SPIs and public classes/interfaces introduced in KIP-405 for tiered storage feature in Kafka.

2021-03-01 Thread GitBox


satishd commented on a change in pull request #10173:
URL: https://github.com/apache/kafka/pull/10173#discussion_r585331289



##
File path: clients/src/main/java/org/apache/kafka/common/TopicIdPartition.java
##
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common;
+
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.UUID;
+
+/**
+ * This represents universally unique identifier with topic id for a topic 
partition. This makes sure that topics
+ * recreated with the same name will always have unique topic identifiers.
+ */
+public class TopicIdPartition implements Serializable {

Review comment:
   sure, I will have a followup PR once this class is updated with Uuid.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12393) Document multi-tenancy considerations

2021-03-01 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17293471#comment-17293471
 ] 

ASF GitHub Bot commented on KAFKA-12393:


miguno commented on pull request #334:
URL: https://github.com/apache/kafka-site/pull/334#issuecomment-788696584


   @bbejeck wrote in 
https://github.com/apache/kafka-site/pull/334#pullrequestreview-600990037:
   > Also, @miguno, can you create an identical PR to go against docs in AK 
trunk?
   
   Yes, I will do this once the content review of this PR is completed.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Document multi-tenancy considerations
> -
>
> Key: KAFKA-12393
> URL: https://issues.apache.org/jira/browse/KAFKA-12393
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation
>Reporter: Michael G. Noll
>Assignee: Michael G. Noll
>Priority: Minor
>
> We should provide an overview of multi-tenancy consideration (e.g., user 
> spaces, security) as the current documentation lacks such information.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] satishd commented on a change in pull request #10173: KAFKA-9548 Added SPIs and public classes/interfaces introduced in KIP-405 for tiered storage feature in Kafka.

2021-03-01 Thread GitBox


satishd commented on a change in pull request #10173:
URL: https://github.com/apache/kafka/pull/10173#discussion_r585283720



##
File path: 
clients/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java
##
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+/**
+ * It describes the metadata about a topic partition's remote log segment in 
the remote storage. This is uniquely
+ * represented with {@link RemoteLogSegmentId}.
+ * 
+ * New instance is always created with the state as {@link 
RemoteLogSegmentState#COPY_SEGMENT_STARTED}. This can be
+ * updated by applying {@link RemoteLogSegmentMetadataUpdate} for the 
respective {@link RemoteLogSegmentId} of the
+ * {@code RemoteLogSegmentMetadata}.
+ */
+@InterfaceStability.Evolving
+public class RemoteLogSegmentMetadata implements Serializable {

Review comment:
   It is strictly not required. I was using java serialization earlier(POC) 
before I added Kafka protocol serdes for remote log segment or partition 
metadata.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on a change in pull request #10173: KAFKA-9548 Added SPIs and public classes/interfaces introduced in KIP-405 for tiered storage feature in Kafka.

2021-03-01 Thread GitBox


satishd commented on a change in pull request #10173:
URL: https://github.com/apache/kafka/pull/10173#discussion_r585269658



##
File path: clients/src/main/java/org/apache/kafka/common/TopicIdPartition.java
##
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common;
+
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.UUID;
+
+/**
+ * This represents universally unique identifier with topic id for a topic 
partition. This makes sure that topics
+ * recreated with the same name will always have unique topic identifiers.
+ */
+public class TopicIdPartition implements Serializable {

Review comment:
   sure, I will have a followup PR once this class is updated with Uuid.

##
File path: 
clients/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java
##
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+/**
+ * It describes the metadata about a topic partition's remote log segment in 
the remote storage. This is uniquely
+ * represented with {@link RemoteLogSegmentId}.
+ * 
+ * New instance is always created with the state as {@link 
RemoteLogSegmentState#COPY_SEGMENT_STARTED}. This can be
+ * updated by applying {@link RemoteLogSegmentMetadataUpdate} for the 
respective {@link RemoteLogSegmentId} of the
+ * {@code RemoteLogSegmentMetadata}.
+ */
+@InterfaceStability.Evolving
+public class RemoteLogSegmentMetadata implements Serializable {

Review comment:
   It is strictly not required. I was using java serialization earlier 
before I added Kafka protocol serdes for remote log segment or partition 
metadata.

##
File path: 
clients/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java
##
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.io.Closeable;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * This interface provides storing and fetching remote log segment metadata 
with strongly consistent semantics.
+ * 
+ * This class can be 

[GitHub] [kafka] chia7712 commented on a change in pull request #10223: KAFKA-12394: Return `TOPIC_AUTHORIZATION_FAILED` in delete topic response if no describe permission

2021-03-01 Thread GitBox


chia7712 commented on a change in pull request #10223:
URL: https://github.com/apache/kafka/pull/10223#discussion_r585322244



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1884,20 +1884,26 @@ class KafkaApis(val requestChannel: RequestChannel,
   val authorizedDeleteTopics = 
authHelper.filterByAuthorized(request.context, DELETE, TOPIC,
 results.asScala.filter(result => result.name() != null))(_.name)
   results.forEach { topic =>
-val unresolvedTopicId = !(topic.topicId() == Uuid.ZERO_UUID) && 
topic.name() == null
- if (!config.usesTopicId && 
topicIdsFromRequest.contains(topic.topicId)) {
-   topic.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
-   topic.setErrorMessage("Topic IDs are not supported on the server.")
- } else if (unresolvedTopicId)
- topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
- else if (topicIdsFromRequest.contains(topic.topicId) && 
!authorizedDescribeTopics(topic.name))
-   topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
- else if (!authorizedDeleteTopics.contains(topic.name))
-   topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
- else if (!metadataCache.contains(topic.name))
-   topic.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
- else
-   toDelete += topic.name
+val unresolvedTopicId = topic.topicId() != Uuid.ZERO_UUID && 
topic.name() == null
+if (!config.usesTopicId && 
topicIdsFromRequest.contains(topic.topicId)) {
+  topic.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
+  topic.setErrorMessage("Topic IDs are not supported on the server.")
+} else if (unresolvedTopicId) {
+  topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
+} else if (topicIdsFromRequest.contains(topic.topicId) && 
!authorizedDescribeTopics(topic.name)) {
+  // Because the client does not have Describe permission, the name 
should
+  // not be returned in the response. Note, however, that we do not 
consider
+  // the topicId itself to be sensitive, so there is no reason to 
obscure
+  // this case with `UNKNOWN_TOPIC_ID`.
+  topic.setName(null)
+  topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+} else if (!authorizedDeleteTopics.contains(topic.name)) {

Review comment:
   according to 
https://github.com/apache/kafka/pull/10184#discussion_r585086425, should it 
handle the case `name provided, topic missing, describable => 
UNKNOWN_TOPIC_OR_PARTITION`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dongjinleekr commented on pull request #10244: KAFKA-12399: Add log4j2 Appender

2021-03-01 Thread GitBox


dongjinleekr commented on pull request #10244:
URL: https://github.com/apache/kafka/pull/10244#issuecomment-788692068


   @omkreddy @huxihx Could you have a look? :smile:



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dongjinleekr opened a new pull request #10244: KAFKA-12399: Add log4j2 Appender

2021-03-01 Thread GitBox


dongjinleekr opened a new pull request #10244:
URL: https://github.com/apache/kafka/pull/10244


   This PR implements `log4j2-appender`, a log4j2 equivalent of traditional 
`log4j-appender`.
   
   All `log4j-appender` configuration properties are supported, with some 
additions:
   
   1. `brokerList` is deprecated for the consistency of the other CLI tools. 
`bootstrapServers` is added instead.
   2. `requiredNumAcks` is deprecated for the consistency of `Producer`'s 
`Properties` instance. `acks` is added instead.
   3. A new configuration option, `producerClass`, is added. Any `Producer` 
implementation with the `Properties` argument is supported. The unit test 
itself uses this feature with `MockProducer`.
   4. In `log4j-appender`, the default values of `retries`, `requiredNumAcks`, 
`deliveryTimeoutMs`, `lingerMs`, and `batchSize` are redundantly defined in 
`Log4jAppender` implementation. Since their default values are already defined 
in `ProducerConfig`, `log4j2-appender` does not define their default values.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-8154) Buffer Overflow exceptions between brokers and with clients

2021-03-01 Thread Gordon (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17293463#comment-17293463
 ] 

Gordon commented on KAFKA-8154:
---

Rushed, and grabbed the wrong file. Sorry about that.  Added the kafka-clients 
jar, in case that saves you any effort.

> Buffer Overflow exceptions between brokers and with clients
> ---
>
> Key: KAFKA-8154
> URL: https://issues.apache.org/jira/browse/KAFKA-8154
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.1.0
>Reporter: Rajesh Nataraja
>Priority: Major
> Attachments: server.properties.txt
>
>
> https://github.com/apache/kafka/pull/6495
> https://github.com/apache/kafka/pull/5785



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12399) Add log4j2 Appender

2021-03-01 Thread Dongjin Lee (Jira)
Dongjin Lee created KAFKA-12399:
---

 Summary: Add log4j2 Appender
 Key: KAFKA-12399
 URL: https://issues.apache.org/jira/browse/KAFKA-12399
 Project: Kafka
  Issue Type: Improvement
  Components: logging
Reporter: Dongjin Lee
Assignee: Dongjin Lee


As a following job of KAFKA-9366, we have to provide a log4j2 counterpart to 
log4j-appender.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8206) A consumer can't discover new group coordinator when the cluster was partly restarted

2021-03-01 Thread Ivan Yurchenko (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17293424#comment-17293424
 ] 

Ivan Yurchenko commented on KAFKA-8206:
---

Sorry, unfortunately, I didn't proceed to making a KIP. (BTW, I stopped seeing 
this issue after some upgrade.)

> A consumer can't discover new group coordinator when the cluster was partly 
> restarted
> -
>
> Key: KAFKA-8206
> URL: https://issues.apache.org/jira/browse/KAFKA-8206
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.0.0, 2.0.0, 2.2.0
>Reporter: alex gabriel
>Priority: Critical
>  Labels: needs-kip
>
> *A consumer can't discover new group coordinator when the cluster was partly 
> restarted*
> Preconditions:
> I use Kafka server and Java kafka-client lib 2.2 version
> I have 2 Kafka nodes running localy (localhost:9092, localhost:9093) and 1 
> ZK(localhost:2181)
> I have replication factor 2 for the all my topics and 
> '_unclean.leader.election.enable=true_' on both Kafka nodes.
> Steps to reproduce:
> 1) Start 2nodes (localhost:9092/localhost:9093)
> 2) Start consumer with 'bootstrap.servers=localhost:9092,localhost:9093'
> {noformat}
> // discovered group coordinator (0-node)
> 2019-04-09 16:23:18,963 INFO 
> [org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess]
>  - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] 
> Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)>
> ...metadatacache is updated (2 nodes in the cluster list)
> 2019-04-09 16:23:18,928 DEBUG 
> [org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate] - 
> [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Sending 
> metadata request (type=MetadataRequest, topics=) to node localhost:9092 
> (id: -1 rack: null)>
> 2019-04-09 16:23:18,940 DEBUG [org.apache.kafka.clients.Metadata.update] - 
> Updated cluster metadata version 2 to MetadataCache{cluster=Cluster(id = 
> P3pz1xU0SjK-Dhy6h2G5YA, nodes = [localhost:9092 (id: 0 rack: null), 
> localhost:9093 (id: 1 rack: null)], partitions = [], controller = 
> localhost:9092 (id: 0 rack: null))}>
> {noformat}
> 3) Shutdown 1-node (localhost:9093)
> {noformat}
> // metadata was updated to the 4 version (but for some reasons it still had 2 
> alive nodes inside cluster)
> 2019-04-09 16:23:46,981 DEBUG [org.apache.kafka.clients.Metadata.update] - 
> Updated cluster metadata version 4 to MetadataCache{cluster=Cluster(id = 
> P3pz1xU0SjK-Dhy6h2G5YA, nodes = [localhost:9093 (id: 1 rack: null), 
> localhost:9092 (id: 0 rack: null)], partitions = [Partition(topic = 
> events-sorted, partition = 1, leader = 0, replicas = [0,1], isr = [0,1], 
> offlineReplicas = []), Partition(topic = events-sorted, partition = 0, leader 
> = 0, replicas = [0,1], isr = [0,1], offlineReplicas = [])], controller = 
> localhost:9092 (id: 0 rack: null))}>
> //consumers thinks that node-1 is still alive and try to send coordinator 
> lookup to it but failed
> 2019-04-09 16:23:46,981 INFO 
> [org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess]
>  - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] 
> Discovered group coordinator localhost:9093 (id: 2147483646 rack: null)>
> 2019-04-09 16:23:46,981 INFO 
> [org.apache.kafka.clients.consumer.internals.AbstractCoordinator.markCoordinatorUnknown]
>  - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Group 
> coordinator localhost:9093 (id: 2147483646 rack: null) is unavailable or 
> invalid, will attempt rediscovery>
> 2019-04-09 16:24:01,117 DEBUG 
> [org.apache.kafka.clients.NetworkClient.handleDisconnections] - [Consumer 
> clientId=events-consumer0, groupId=events-group-gabriel] Node 1 disconnected.>
> 2019-04-09 16:24:01,117 WARN 
> [org.apache.kafka.clients.NetworkClient.processDisconnection] - [Consumer 
> clientId=events-consumer0, groupId=events-group-gabriel] Connection to node 1 
> (localhost:9093) could not be established. Broker may not be available.>
> // refreshing metadata again
> 2019-04-09 16:24:01,117 DEBUG 
> [org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion]
>  - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] 
> Cancelled request with header RequestHeader(apiKey=FIND_COORDINATOR, 
> apiVersion=2, clientId=events-consumer0, correlationId=112) due to node 1 
> being disconnected>
> 2019-04-09 16:24:01,117 DEBUG 
> [org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady]
>  - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] 
> Coordinator discovery failed, refreshing metadata>
> // metadata 

[GitHub] [kafka] dengziming opened a new pull request #10243: KAFKA-12398: Fix flaky test `ConsumerBounceTest.testClose`

2021-03-01 Thread GitBox


dengziming opened a new pull request #10243:
URL: https://github.com/apache/kafka/pull/10243


   *More detailed description of your change*
   The test fails some times as follow:
   ![Pasted 
Graphic](https://user-images.githubusercontent.com/26023240/109610770-286d5080-7b68-11eb-92a5-ab6b45b79c2f.png)
   
   We'd better use `TestUtils.waitUntilTrue` instead of waiting for 1 second 
because sometimes 1 second is too long and sometimes is too short.
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon opened a new pull request #10242: MINOR: format the revoking active log output

2021-03-01 Thread GitBox


showuon opened a new pull request #10242:
URL: https://github.com/apache/kafka/pull/10242


   Missed one new line symbol.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12398) Fix flaky test `ConsumerBounceTest.testClose`

2021-03-01 Thread dengziming (Jira)
dengziming created KAFKA-12398:
--

 Summary: Fix flaky test `ConsumerBounceTest.testClose`
 Key: KAFKA-12398
 URL: https://issues.apache.org/jira/browse/KAFKA-12398
 Project: Kafka
  Issue Type: Improvement
Reporter: dengziming
Assignee: dengziming
 Attachments: image-2021-03-02-14-22-34-367.png

Sometimes it failed with the following error:

!image-2021-03-02-14-22-34-367.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] lbradstreet opened a new pull request #10241: MINOR: time and log producer state recovery phases

2021-03-01 Thread GitBox


lbradstreet opened a new pull request #10241:
URL: https://github.com/apache/kafka/pull/10241


   During a slow log recovery it's easy to think that loading `.snapshot` files 
is a multi-second process. Often it isn't the snapshot loading that takes most 
of the time, rather it's the time taken to further rebuild the producer state 
from segment files. This PR times both snapshot load and segment recovery 
phases to better indicate what is taking time.
   
   Example test output:
   ```
   [2021-03-01 22:35:28,129] INFO [Log partition=foo-0, 
dir=/var/folders/cb/5my51vjd1js380qcr_v245bhgp/T/kafka-16876782135717603479]
 Reloading from producer snapshot and rebuilding producer state from offset 0 
   [2021-03-01 22:35:28,129] INFO [Log partition=foo-0, 
dir=/var/folders/cb/5my51vjd1js380qcr_v245bhgp/T/kafka-16876782135717603479]
 Producer state recovery took 0ms for snapshot load and 0ms for segment 
recovery from offset 0 
   [2021-03-01 22:35:28,131] INFO Completed load of 
Log(dir=/var/folders/cb/5my51vjd1js380qcr_v245bhgp/T/kafka-
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-8154) Buffer Overflow exceptions between brokers and with clients

2021-03-01 Thread Ori Popowski (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17293371#comment-17293371
 ] 

Ori Popowski edited comment on KAFKA-8154 at 3/2/21, 5:48 AM:
--

[~gordonmessmer] thank you very much for publishing this JAR.

Edit: this JAR is empty. But I got the idea. Will try to build myself. Thanks


was (Author: oripwk):
[~gordonmessmer] thank you very much for publishing this JAR

> Buffer Overflow exceptions between brokers and with clients
> ---
>
> Key: KAFKA-8154
> URL: https://issues.apache.org/jira/browse/KAFKA-8154
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.1.0
>Reporter: Rajesh Nataraja
>Priority: Major
> Attachments: server.properties.txt
>
>
> https://github.com/apache/kafka/pull/6495
> https://github.com/apache/kafka/pull/5785



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8154) Buffer Overflow exceptions between brokers and with clients

2021-03-01 Thread Ori Popowski (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17293371#comment-17293371
 ] 

Ori Popowski commented on KAFKA-8154:
-

[~gordonmessmer] thank you very much for publishing this JAR

> Buffer Overflow exceptions between brokers and with clients
> ---
>
> Key: KAFKA-8154
> URL: https://issues.apache.org/jira/browse/KAFKA-8154
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.1.0
>Reporter: Rajesh Nataraja
>Priority: Major
> Attachments: server.properties.txt
>
>
> https://github.com/apache/kafka/pull/6495
> https://github.com/apache/kafka/pull/5785



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on a change in pull request #10206: KAFKA-12369; Implement `ListTransactions` API

2021-03-01 Thread GitBox


chia7712 commented on a change in pull request #10206:
URL: https://github.com/apache/kafka/pull/10206#discussion_r585252099



##
File path: 
clients/src/main/resources/common/message/ListTransactionsResponse.json
##
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 66,
+  "type": "response",
+  "name": "ListTransactionsResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+  { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
+"about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
+  { "name": "ErrorCode", "type": "int16", "versions": "0+" },
+  { "name": "UnknownStateFilters", "type": "[]string", "default": "null", 
"versions": "0+", "nullableVersions": "0+",

Review comment:
   the name in request is called `StatesFilter`. Could you make consistent 
naming?

##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -3303,6 +3304,28 @@ class KafkaApis(val requestChannel: RequestChannel,
   new 
DescribeTransactionsResponse(response.setThrottleTimeMs(requestThrottleMs)))
   }
 
+  def handleListTransactionsRequest(request: RequestChannel.Request): Unit = {
+val listTransactionsRequest = request.body[ListTransactionsRequest]
+val filteredProducerIds = 
listTransactionsRequest.data.producerIdFilter.asScala.map(Long.unbox).toSet
+val filteredStates = 
listTransactionsRequest.data.statesFilter.asScala.toSet
+val response = txnCoordinator.handleListTransactions(filteredProducerIds, 
filteredStates)
+
+// The response should contain only transactionalIds that the principal
+// has `Describe` permission to access.
+if (response.transactionStates != null) {

Review comment:
   Is this null check necessary? the filed is NOT nullable. There appears 
to be a bug if we set null to it in production code.

##
File path: 
clients/src/main/resources/common/message/ListTransactionsResponse.json
##
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 66,
+  "type": "response",
+  "name": "ListTransactionsResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+  { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
+"about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
+  { "name": "ErrorCode", "type": "int16", "versions": "0+" },
+  { "name": "UnknownStateFilters", "type": "[]string", "default": "null", 
"versions": "0+", "nullableVersions": "0+",

Review comment:
   Why it requires `nullableVersions`?

##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -3303,6 +3304,28 @@ class KafkaApis(val requestChannel: RequestChannel,
   new 
DescribeTransactionsResponse(response.setThrottleTimeMs(requestThrottleMs)))
   }
 
+  def handleListTransactionsRequest(request: RequestChannel.Request): Unit = {
+val listTransactionsRequest = request.body[ListTransactionsRequest]
+val filteredProducerIds = 
listTransactionsRequest.data.producerIdFilter.asScala.map(Long.unbox).toSet
+val filteredStates = 
listTransactionsRequest.data.statesFilter.asScala.toSet
+val response = 

[GitHub] [kafka] chia7712 commented on a change in pull request #10203: MINOR: Prepare for Gradle 7.0

2021-03-01 Thread GitBox


chia7712 commented on a change in pull request #10203:
URL: https://github.com/apache/kafka/pull/10203#discussion_r585246882



##
File path: build.gradle
##
@@ -1586,15 +1605,17 @@ project(':streams:test-utils') {
   archivesBaseName = "kafka-streams-test-utils"
 
   dependencies {
-compile project(':streams')
-compile project(':clients')
+implementation project(':streams')

Review comment:
   I guess this is similar to `connect-runtime` that it is easy to be used 
in users' testing scope. Maybe we should keep exposing both modules.

##
File path: release.py
##
@@ -631,7 +631,7 @@ def select_gpg_key():
 contents = f.read()
 if not user_ok("Going to build and upload mvn artifacts based on these 
settings:\n" + contents + '\nOK (y/n)?: '):
 fail("Retry again later")
-cmd("Building and uploading archives", "./gradlewAll uploadArchives", 
cwd=kafka_dir, env=jdk8_env, shell=True)
+cmd("Building and uploading archives", "./gradlewAll publish", cwd=kafka_dir, 
env=jdk8_env, shell=True)

Review comment:
   the root/`README.md` still use `./gradlewAll uploadArchives`. It would 
be great to make them consistent.

##
File path: build.gradle
##
@@ -1468,7 +1461,7 @@ project(':streams') {
   include('log4j*jar')
   include('*hamcrest*')
 }
-from (configurations.runtime) {
+from (configurations.runtimeClasspath) {

Review comment:
   Is there a jira already?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #10193: MINOR: correct the error message of validating uint32

2021-03-01 Thread GitBox


chia7712 commented on a change in pull request #10193:
URL: https://github.com/apache/kafka/pull/10193#discussion_r585237560



##
File path: 
clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
##
@@ -320,7 +320,7 @@ public Long validate(Object item) {
 if (item instanceof Long)
 return (Long) item;
 else
-throw new SchemaException(item + " is not a Long.");
+throw new SchemaException(item + " is not an unsigned 
integer.");

Review comment:
   make sense. will copy that.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9758: MINOR: remove FetchResponse.AbortedTransaction and redundant construc…

2021-03-01 Thread GitBox


chia7712 commented on pull request #9758:
URL: https://github.com/apache/kafka/pull/9758#issuecomment-788567775


   @ijuma Thanks for all your great comments. I have updated this PR. Please 
take a look. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9758: MINOR: remove FetchResponse.AbortedTransaction and redundant construc…

2021-03-01 Thread GitBox


chia7712 commented on a change in pull request #9758:
URL: https://github.com/apache/kafka/pull/9758#discussion_r585235586



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##
@@ -365,17 +126,92 @@ public int sessionId() {
  * @param partIterator  The partition iterator.
  * @return  The response size in bytes.
  */
-public static  int sizeOf(short version,
- 
Iterator>> partIterator) {
+public static int sizeOf(short version,
+ Iterator> partIterator) {
 // Since the throttleTimeMs and metadata field sizes are constant and 
fixed, we can
 // use arbitrary values here without affecting the result.
-FetchResponseData data = toMessage(0, Errors.NONE, partIterator, 
INVALID_SESSION_ID);
+LinkedHashMap data = 
new LinkedHashMap<>();
+partIterator.forEachRemaining(entry -> data.put(entry.getKey(), 
entry.getValue()));
 ObjectSerializationCache cache = new ObjectSerializationCache();
-return 4 + data.size(cache, version);
+return 4 + FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, 
data).data.size(cache, version);
 }
 
 @Override
 public boolean shouldClientThrottle(short version) {
 return version >= 8;
 }
-}
+
+public static Optional 
divergingEpoch(FetchResponseData.PartitionData partitionResponse) {
+return partitionResponse.divergingEpoch().epoch() < 0 ? 
Optional.empty()
+: Optional.of(partitionResponse.divergingEpoch());
+}
+
+public static boolean isDivergingEpoch(FetchResponseData.PartitionData 
partitionResponse) {
+return partitionResponse.divergingEpoch().epoch() >= 0;
+}
+
+public static Optional 
preferredReadReplica(FetchResponseData.PartitionData partitionResponse) {
+return partitionResponse.preferredReadReplica() == 
INVALID_PREFERRED_REPLICA_ID ? Optional.empty()
+: Optional.of(partitionResponse.preferredReadReplica());
+}
+
+public static boolean isPreferredReplica(FetchResponseData.PartitionData 
partitionResponse) {
+return partitionResponse.preferredReadReplica() != 
INVALID_PREFERRED_REPLICA_ID;
+}
+
+public static FetchResponseData.PartitionData partitionResponse(int 
partition, Errors error) {
+return new FetchResponseData.PartitionData()
+.setPartitionIndex(partition)
+.setErrorCode(error.code())
+.setHighWatermark(FetchResponse.INVALID_HIGH_WATERMARK);
+}
+
+/**
+ * cast the BaseRecords of PartitionData to Records. KRPC converts the 
byte array to MemoryRecords so this method
+ * never fail if the data is from KRPC.
+ *
+ * @param partition partition data
+ * @return Records or empty record if the records in PartitionData is null.
+ */
+public static Records records(FetchResponseData.PartitionData partition) {

Review comment:
   good point. will copy that





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10163: KAFKA-10357: Extract setup of changelog from Streams partition assignor

2021-03-01 Thread GitBox


ableegoldman commented on a change in pull request #10163:
URL: https://github.com/apache/kafka/pull/10163#discussion_r583907570



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogTopics.java
##
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo;
+import org.slf4j.Logger;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.UNKNOWN;
+
+public class ChangelogTopics {
+
+private final InternalTopicManager internalTopicManager;
+private final Map topicGroups;
+private final Map> tasksForTopicGroup;
+private final Map> changelogPartitionsForTask 
= new HashMap<>();
+private final Map> 
preExistingChangelogPartitionsForTask = new HashMap<>();
+private final Set 
preExistingNonSourceTopicBasedChangelogPartitions = new HashSet<>();
+private final Set sourceTopicBasedChangelogTopics = new 
HashSet<>();
+private final Set sourceTopicBasedChangelogTopicPartitions 
= new HashSet<>();
+private final Logger log;
+
+public ChangelogTopics(final InternalTopicManager internalTopicManager,
+   final Map topicGroups,
+   final Map> tasksForTopicGroup,
+   final String logPrefix) {
+this.internalTopicManager = internalTopicManager;
+this.topicGroups = topicGroups;
+this.tasksForTopicGroup = tasksForTopicGroup;
+final LogContext logContext = new LogContext(logPrefix);
+log = logContext.logger(getClass());
+}
+
+public void setup() {

Review comment:
   Is there a specific reason we need to make an explicit call to `setup()` 
rather than just doing this in the constructor? I'm always worried we'll end up 
forgetting to call `setup` again after some refactoring and someone will waste 
a day debugging their code because they tried to use a `Changelogs` object 
before/without first calling `setup()`

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogTopics.java
##
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo;
+import org.slf4j.Logger;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.UNKNOWN;
+
+public class ChangelogTopics {
+
+private final InternalTopicManager internalTopicManager;
+private final Map topicGroups;
+private final Map> tasksForTopicGroup;
+private final Map> 

[GitHub] [kafka] chia7712 commented on a change in pull request #9758: MINOR: remove FetchResponse.AbortedTransaction and redundant construc…

2021-03-01 Thread GitBox


chia7712 commented on a change in pull request #9758:
URL: https://github.com/apache/kafka/pull/9758#discussion_r585229860



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##
@@ -365,17 +126,92 @@ public int sessionId() {
  * @param partIterator  The partition iterator.
  * @return  The response size in bytes.
  */
-public static  int sizeOf(short version,
- 
Iterator>> partIterator) {
+public static int sizeOf(short version,
+ Iterator> partIterator) {
 // Since the throttleTimeMs and metadata field sizes are constant and 
fixed, we can
 // use arbitrary values here without affecting the result.
-FetchResponseData data = toMessage(0, Errors.NONE, partIterator, 
INVALID_SESSION_ID);
+LinkedHashMap data = 
new LinkedHashMap<>();
+partIterator.forEachRemaining(entry -> data.put(entry.getKey(), 
entry.getValue()));
 ObjectSerializationCache cache = new ObjectSerializationCache();
-return 4 + data.size(cache, version);
+return 4 + FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, 
data).data.size(cache, version);
 }
 
 @Override
 public boolean shouldClientThrottle(short version) {
 return version >= 8;
 }
-}
+
+public static Optional 
divergingEpoch(FetchResponseData.PartitionData partitionResponse) {
+return partitionResponse.divergingEpoch().epoch() < 0 ? 
Optional.empty()
+: Optional.of(partitionResponse.divergingEpoch());
+}
+
+public static boolean isDivergingEpoch(FetchResponseData.PartitionData 
partitionResponse) {
+return partitionResponse.divergingEpoch().epoch() >= 0;
+}
+
+public static Optional 
preferredReadReplica(FetchResponseData.PartitionData partitionResponse) {
+return partitionResponse.preferredReadReplica() == 
INVALID_PREFERRED_REPLICA_ID ? Optional.empty()
+: Optional.of(partitionResponse.preferredReadReplica());
+}
+
+public static boolean isPreferredReplica(FetchResponseData.PartitionData 
partitionResponse) {
+return partitionResponse.preferredReadReplica() != 
INVALID_PREFERRED_REPLICA_ID;
+}
+
+public static FetchResponseData.PartitionData partitionResponse(int 
partition, Errors error) {
+return new FetchResponseData.PartitionData()
+.setPartitionIndex(partition)
+.setErrorCode(error.code())
+.setHighWatermark(FetchResponse.INVALID_HIGH_WATERMARK);
+}
+
+/**
+ * cast the BaseRecords of PartitionData to Records. KRPC converts the 
byte array to MemoryRecords so this method
+ * never fail if the data is from KRPC.
+ *
+ * @param partition partition data
+ * @return Records or empty record if the records in PartitionData is null.
+ */
+public static Records records(FetchResponseData.PartitionData partition) {
+return partition.records() == null ? MemoryRecords.EMPTY : (Records) 
partition.records();

Review comment:
   will copy that





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9758: MINOR: remove FetchResponse.AbortedTransaction and redundant construc…

2021-03-01 Thread GitBox


chia7712 commented on a change in pull request #9758:
URL: https://github.com/apache/kafka/pull/9758#discussion_r585226663



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##
@@ -365,17 +126,92 @@ public int sessionId() {
  * @param partIterator  The partition iterator.
  * @return  The response size in bytes.
  */
-public static  int sizeOf(short version,
- 
Iterator>> partIterator) {
+public static int sizeOf(short version,
+ Iterator> partIterator) {
 // Since the throttleTimeMs and metadata field sizes are constant and 
fixed, we can
 // use arbitrary values here without affecting the result.
-FetchResponseData data = toMessage(0, Errors.NONE, partIterator, 
INVALID_SESSION_ID);
+LinkedHashMap data = 
new LinkedHashMap<>();
+partIterator.forEachRemaining(entry -> data.put(entry.getKey(), 
entry.getValue()));
 ObjectSerializationCache cache = new ObjectSerializationCache();
-return 4 + data.size(cache, version);
+return 4 + FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, 
data).data.size(cache, version);
 }
 
 @Override
 public boolean shouldClientThrottle(short version) {
 return version >= 8;
 }
-}
+
+public static Optional 
divergingEpoch(FetchResponseData.PartitionData partitionResponse) {
+return partitionResponse.divergingEpoch().epoch() < 0 ? 
Optional.empty()
+: Optional.of(partitionResponse.divergingEpoch());
+}
+
+public static boolean isDivergingEpoch(FetchResponseData.PartitionData 
partitionResponse) {
+return partitionResponse.divergingEpoch().epoch() >= 0;
+}
+
+public static Optional 
preferredReadReplica(FetchResponseData.PartitionData partitionResponse) {
+return partitionResponse.preferredReadReplica() == 
INVALID_PREFERRED_REPLICA_ID ? Optional.empty()
+: Optional.of(partitionResponse.preferredReadReplica());
+}
+
+public static boolean isPreferredReplica(FetchResponseData.PartitionData 
partitionResponse) {
+return partitionResponse.preferredReadReplica() != 
INVALID_PREFERRED_REPLICA_ID;
+}
+
+public static FetchResponseData.PartitionData partitionResponse(int 
partition, Errors error) {
+return new FetchResponseData.PartitionData()
+.setPartitionIndex(partition)
+.setErrorCode(error.code())
+.setHighWatermark(FetchResponse.INVALID_HIGH_WATERMARK);
+}
+
+/**
+ * cast the BaseRecords of PartitionData to Records. KRPC converts the 
byte array to MemoryRecords so this method
+ * never fail if the data is from KRPC.

Review comment:
   good one. will copy that





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10215: KAFKA-12375: don't reuse thread.id until a thread has fully shut down

2021-03-01 Thread GitBox


ableegoldman commented on pull request #10215:
URL: https://github.com/apache/kafka/pull/10215#issuecomment-788556426


   This should be ready for a final review @cadonna @wcarlson5 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10215: KAFKA-12375: don't reuse thread.id until a thread has fully shut down

2021-03-01 Thread GitBox


ableegoldman commented on a change in pull request #10215:
URL: https://github.com/apache/kafka/pull/10215#discussion_r585225255



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -463,9 +464,8 @@ private void replaceStreamThread(final Throwable throwable) 
{
 closeToError();
 }
 final StreamThread deadThread = (StreamThread) Thread.currentThread();
-threads.remove(deadThread);

Review comment:
   Yeah I think swapping the names would make the code unnecessarily 
complicated, and it would definitely make reading the logs more difficult. 
   Just to note: in the current rebalance protocol, the thread name should not 
impact the task assignment since within a client tasks are always just assigned 
to their previous owner (we maximize stickiness & balance)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9758: MINOR: remove FetchResponse.AbortedTransaction and redundant construc…

2021-03-01 Thread GitBox


chia7712 commented on a change in pull request #9758:
URL: https://github.com/apache/kafka/pull/9758#discussion_r585224829



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -761,79 +754,84 @@ class KafkaApis(val requestChannel: RequestChannel,
 // For fetch requests from clients, check if down-conversion is 
disabled for the particular partition
 if (!fetchRequest.isFromFollower && 
!logConfig.forall(_.messageDownConversionEnable)) {
   trace(s"Conversion to message format ${downConvertMagic.get} is 
disabled for partition $tp. Sending unsupported version response to $clientId.")
-  errorResponse(Errors.UNSUPPORTED_VERSION)
+  FetchResponse.partitionResponse(tp.partition, 
Errors.UNSUPPORTED_VERSION)
 } else {
   try {
 trace(s"Down converting records from partition $tp to message 
format version $magic for fetch request from $clientId")
 // Because down-conversion is extremely memory intensive, we 
want to try and delay the down-conversion as much
 // as possible. With KIP-283, we have the ability to lazily 
down-convert in a chunked manner. The lazy, chunked
 // down-conversion always guarantees that at least one batch 
of messages is down-converted and sent out to the
 // client.
-val error = maybeDownConvertStorageError(partitionData.error)
-new FetchResponse.PartitionData[BaseRecords](error, 
partitionData.highWatermark,
-  partitionData.lastStableOffset, partitionData.logStartOffset,
-  partitionData.preferredReadReplica, 
partitionData.abortedTransactions,
-  new LazyDownConversionRecords(tp, unconvertedRecords, magic, 
fetchContext.getFetchOffset(tp).get, time))
+new FetchResponseData.PartitionData()
+  .setPartitionIndex(tp.partition)
+  
.setErrorCode(maybeDownConvertStorageError(Errors.forCode(partitionData.errorCode)).code)
+  .setHighWatermark(partitionData.highWatermark)
+  .setLastStableOffset(partitionData.lastStableOffset)
+  .setLogStartOffset(partitionData.logStartOffset)
+  .setAbortedTransactions(partitionData.abortedTransactions)
+  .setRecords(new LazyDownConversionRecords(tp, 
unconvertedRecords, magic, fetchContext.getFetchOffset(tp).get, time))
+  
.setPreferredReadReplica(partitionData.preferredReadReplica())
   } catch {
 case e: UnsupportedCompressionTypeException =>
   trace("Received unsupported compression type error during 
down-conversion", e)
-  errorResponse(Errors.UNSUPPORTED_COMPRESSION_TYPE)
+  FetchResponse.partitionResponse(tp.partition, 
Errors.UNSUPPORTED_COMPRESSION_TYPE)
   }
 }
   case None =>
-val error = maybeDownConvertStorageError(partitionData.error)
-new FetchResponse.PartitionData[BaseRecords](error,
-  partitionData.highWatermark,
-  partitionData.lastStableOffset,
-  partitionData.logStartOffset,
-  partitionData.preferredReadReplica,
-  partitionData.abortedTransactions,
-  partitionData.divergingEpoch,
-  unconvertedRecords)
+new FetchResponseData.PartitionData()
+  .setPartitionIndex(tp.partition)
+  
.setErrorCode(maybeDownConvertStorageError(Errors.forCode(partitionData.errorCode)).code)
+  .setHighWatermark(partitionData.highWatermark)
+  .setLastStableOffset(partitionData.lastStableOffset)
+  .setLogStartOffset(partitionData.logStartOffset)
+  .setAbortedTransactions(partitionData.abortedTransactions)
+  .setRecords(unconvertedRecords)
+  .setPreferredReadReplica(partitionData.preferredReadReplica)
+  .setDivergingEpoch(partitionData.divergingEpoch)
 }
   }
 }
 
 // the callback for process a fetch response, invoked before throttling
 def processResponseCallback(responsePartitionData: Seq[(TopicPartition, 
FetchPartitionData)]): Unit = {
-  val partitions = new util.LinkedHashMap[TopicPartition, 
FetchResponse.PartitionData[Records]]
+  val partitions = new util.LinkedHashMap[TopicPartition, 
FetchResponseData.PartitionData]
   val reassigningPartitions = mutable.Set[TopicPartition]()
   responsePartitionData.foreach { case (tp, data) =>
 val abortedTransactions = data.abortedTransactions.map(_.asJava).orNull
 val lastStableOffset = 
data.lastStableOffset.getOrElse(FetchResponse.INVALID_LAST_STABLE_OFFSET)
-if (data.isReassignmentFetch)
-  reassigningPartitions.add(tp)
-val error = 

[GitHub] [kafka] chia7712 commented on a change in pull request #9758: MINOR: remove FetchResponse.AbortedTransaction and redundant construc…

2021-03-01 Thread GitBox


chia7712 commented on a change in pull request #9758:
URL: https://github.com/apache/kafka/pull/9758#discussion_r585224204



##
File path: core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
##
@@ -534,15 +588,21 @@ class FetchSessionTest {
   Optional.empty()))
 val session2context = fetchManager.newContext(JFetchMetadata.INITIAL, 
session1req, EMPTY_PART_LIST, false)
 assertEquals(classOf[FullFetchContext], session2context.getClass)
-val session2RespData = new util.LinkedHashMap[TopicPartition, 
FetchResponse.PartitionData[Records]]
-session2RespData.put(new TopicPartition("foo", 0), new 
FetchResponse.PartitionData(
-  Errors.NONE, 100, 100, 100, null, null))
-session2RespData.put(new TopicPartition("foo", 1), new 
FetchResponse.PartitionData(
-  Errors.NONE, 10, 10, 10, null, null))
+val session2RespData = new util.LinkedHashMap[TopicPartition, 
FetchResponseData.PartitionData]
+session2RespData.put(new TopicPartition("foo", 0),
+  new FetchResponseData.PartitionData()
+.setHighWatermark(100)
+.setLastStableOffset(100)
+.setLogStartOffset(100))
+session2RespData.put(new TopicPartition("foo", 1),
+  new FetchResponseData.PartitionData()
+.setHighWatermark(10)

Review comment:
   good catch. will fix all similar issues in next commit.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10215: KAFKA-12375: don't reuse thread.id until a thread has fully shut down

2021-03-01 Thread GitBox


ableegoldman commented on a change in pull request #10215:
URL: https://github.com/apache/kafka/pull/10215#discussion_r585223909



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1047,9 +1047,15 @@ private int getNumStreamThreads(final boolean 
hasGlobalTopology) {
 if 
(!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs - begin)) {
 log.warn("Thread " + streamThread.getName() + 
" did not shutdown in the allotted time");
 timeout = true;
+// Don't remove from threads until shutdown is 
complete. We will trim it from the
+// list once it reaches DEAD, and if for some 
reason it's hanging indefinitely in the
+// shutdown then we should just consider this 
thread.id to be burned
+} else {
+threads.remove(streamThread);
 }
 }
-threads.remove(streamThread);
+// Don't remove from threads until shutdown is 
complete since this will let another thread
+// reuse its thread.id. We will trim any DEAD threads 
from the list later
 final long cacheSizePerThread = 
getCacheSizePerThread(threads.size());

Review comment:
   Good catch. I added a method `getNumLiveStreamThreads` to use instead of 
just `threads.size()` which will trim the list of any DEAD threads and return 
the actual number of living threads





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9758: MINOR: remove FetchResponse.AbortedTransaction and redundant construc…

2021-03-01 Thread GitBox


chia7712 commented on a change in pull request #9758:
URL: https://github.com/apache/kafka/pull/9758#discussion_r585221282



##
File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
##
@@ -1144,8 +1143,14 @@ class AbstractFetcherThreadTest {
   (Errors.NONE, records)
 }
 
-(partition, new FetchData(error, leaderState.highWatermark, 
leaderState.highWatermark, leaderState.logStartOffset,
-  Optional.empty[Integer], List.empty.asJava, divergingEpoch.asJava, 
records))
+(partition, new FetchResponseData.PartitionData()
+  .setErrorCode(error.code)
+  .setHighWatermark(leaderState.highWatermark)
+  .setLastStableOffset(leaderState.highWatermark)
+  .setLogStartOffset(leaderState.logStartOffset)
+  .setAbortedTransactions(Collections.emptyList())
+  .setRecords(records)
+  .setDivergingEpoch(divergingEpoch.getOrElse(new 
FetchResponseData.EpochEndOffset)))

Review comment:
   the type `FetchablePartitionResponse` is gone and the replacement 
`FetchResponseData.PartitionData` (generated data) can't accept `Optional` type.
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10215: KAFKA-12375: don't reuse thread.id until a thread has fully shut down

2021-03-01 Thread GitBox


ableegoldman commented on a change in pull request #10215:
URL: https://github.com/apache/kafka/pull/10215#discussion_r585220510



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1047,9 +1047,15 @@ private int getNumStreamThreads(final boolean 
hasGlobalTopology) {
 if 
(!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs - begin)) {
 log.warn("Thread " + streamThread.getName() + 
" did not shutdown in the allotted time");
 timeout = true;
+// Don't remove from threads until shutdown is 
complete. We will trim it from the
+// list once it reaches DEAD, and if for some 
reason it's hanging indefinitely in the

Review comment:
   We're trimming it in `getNextThreadIndex`. But if we're going to rely on 
`threads.size()` elsewhere, which it seems we do, then yeah we should trim it 
more aggressively





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9758: MINOR: remove FetchResponse.AbortedTransaction and redundant construc…

2021-03-01 Thread GitBox


chia7712 commented on a change in pull request #9758:
URL: https://github.com/apache/kafka/pull/9758#discussion_r585219789



##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -416,9 +412,8 @@ abstract class AbstractFetcherThread(name: String,
"expected to persist.")
   partitionsWithError += topicPartition
 
-case _ =>
-  error(s"Error for partition $topicPartition at offset 
${currentFetchState.fetchOffset}",
-partitionData.error.exception)
+case partitionError: Errors =>

Review comment:
   you are right. fixed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10215: KAFKA-12375: don't reuse thread.id until a thread has fully shut down

2021-03-01 Thread GitBox


ableegoldman commented on a change in pull request #10215:
URL: https://github.com/apache/kafka/pull/10215#discussion_r585219164



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1047,9 +1047,15 @@ private int getNumStreamThreads(final boolean 
hasGlobalTopology) {
 if 
(!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs - begin)) {
 log.warn("Thread " + streamThread.getName() + 
" did not shutdown in the allotted time");
 timeout = true;
+// Don't remove from threads until shutdown is 
complete. We will trim it from the
+// list once it reaches DEAD, and if for some 
reason it's hanging indefinitely in the
+// shutdown then we should just consider this 
thread.id to be burned
+} else {
+threads.remove(streamThread);

Review comment:
   Yeah, I was mainly trying to keep things simple. There's definitely a 
tradeoff in when we resize the cache: either we resize it right away and risk 
an OOM or we resize it whenever we find newly DEAD threads but potentially have 
to wait to "reclaim" the memory of a thread. 
   Both scenarios run into trouble when a thread is hanging in shutdown, but if 
that occurs something has already gone wrong so I don't think we need to 
guarantee Streams will continue running perfectly. But the downside to resizing 
the cache only once a thread reaches DEAD is that a user could call 
`removeStreamThread()` with a timeout of 0 and then never call add/remove 
thread again, and they'll never get back the memory of the removed thread since 
we only trim the `threads` inside these methods (or the exception handler). ie, 
it seems ok to lazily remove DEAD threads if we only use the `threads` list to 
find a unique `threadId`, but not to lazily resize the cache. WDYT?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on pull request #10240: KAFKA-12381: only return leader not available for internal topic creation

2021-03-01 Thread GitBox


abbccdda commented on pull request #10240:
URL: https://github.com/apache/kafka/pull/10240#issuecomment-788548109


   Triggered system tests:
   trunk: 
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4408/
   fix: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4409/
   to see if we have the verifiable producer test fixed.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda opened a new pull request #10240: KAFKA-12381: only return leader not available for internal topic creation

2021-03-01 Thread GitBox


abbccdda opened a new pull request #10240:
URL: https://github.com/apache/kafka/pull/10240


   We introduced a regression in https://github.com/apache/kafka/pull/9579 
where originally we only return `INVALID_REPLICATION_FACTOR` for internal topic 
creation when there are not enough brokers. The mentioned PR expanded the scope 
and return this error to non-internal topic creation cases as well, which 
should be `LEADER_NOT_AVAILABLE` instead.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10223: KAFKA-12394: Return `TOPIC_AUTHORIZATION_FAILED` in delete topic response if no describe permission

2021-03-01 Thread GitBox


hachikuji commented on a change in pull request #10223:
URL: https://github.com/apache/kafka/pull/10223#discussion_r585212982



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1884,20 +1884,24 @@ class KafkaApis(val requestChannel: RequestChannel,
   val authorizedDeleteTopics = 
authHelper.filterByAuthorized(request.context, DELETE, TOPIC,
 results.asScala.filter(result => result.name() != null))(_.name)
   results.forEach { topic =>
-val unresolvedTopicId = !(topic.topicId() == Uuid.ZERO_UUID) && 
topic.name() == null
- if (!config.usesTopicId && 
topicIdsFromRequest.contains(topic.topicId)) {
-   topic.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
-   topic.setErrorMessage("Topic IDs are not supported on the server.")
- } else if (unresolvedTopicId)
- topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
- else if (topicIdsFromRequest.contains(topic.topicId) && 
!authorizedDescribeTopics(topic.name))
-   topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
- else if (!authorizedDeleteTopics.contains(topic.name))
-   topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
- else if (!metadataCache.contains(topic.name))
-   topic.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
- else
-   toDelete += topic.name
+val unresolvedTopicId = topic.topicId() != Uuid.ZERO_UUID && 
topic.name() == null
+if (!config.usesTopicId && 
topicIdsFromRequest.contains(topic.topicId)) {
+  topic.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
+  topic.setErrorMessage("Topic IDs are not supported on the server.")
+} else if (unresolvedTopicId) {
+  topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
+} else if (topicIdsFromRequest.contains(topic.topicId) && 
!authorizedDescribeTopics(topic.name)) {
+  // Because the client does not have Describe permission, the name 
should
+  // not be returned in the response.
+  topic.setName(null)
+  topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)

Review comment:
   @chia7712 @jolshan I ended up doing this here after all since it sounds 
like there is consensus on not treating the topicId as sensitive based on the 
JIRA discussion. Let me know if you have any concerns.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10215: KAFKA-12375: don't reuse thread.id until a thread has fully shut down

2021-03-01 Thread GitBox


ableegoldman commented on a change in pull request #10215:
URL: https://github.com/apache/kafka/pull/10215#discussion_r585107512



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1094,16 +1100,32 @@ private int getNumStreamThreads(final boolean 
hasGlobalTopology) {
 }
 
 private int getNextThreadIndex() {
-final HashSet names = new HashSet<>();
-processStreamThread(thread -> names.add(thread.getName()));
-final String baseName = clientId + "-StreamThread-";
-for (int i = 1; i <= threads.size(); i++) {
-final String name = baseName + i;
-if (!names.contains(name)) {
-return i;
+final HashSet allLiveThreadNames = new HashSet<>();
+final AtomicInteger maxThreadId = new AtomicInteger(1);

Review comment:
   It's because of the whole "variables used in a lambda must be final or 
effectively final" thing





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #10228: KAFKA-10251: increase timeout for consumeing records

2021-03-01 Thread GitBox


showuon commented on pull request #10228:
URL: https://github.com/apache/kafka/pull/10228#issuecomment-788534430


   @ableegoldman , to investigate this flaky test, I have a drafted PR, to 
output some debug log on jenkins build. This is what I mentioned in the PR 
description: `failed at 1st try`
   
   
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-9724/148/testReport/junit/kafka.api/TransactionsBounceTest/Build___JDK_15___testWithGroupMetadata__/
   
   So, I cannot reproduce it on my local env, instead, I reproduce it on 
jenkins env. 
   
   Thank you.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12397) Error:KeeperErrorCode = BadVersion for /brokers/topics

2021-03-01 Thread Sachin (Jira)
Sachin created KAFKA-12397:
--

 Summary: Error:KeeperErrorCode = BadVersion for /brokers/topics
 Key: KAFKA-12397
 URL: https://issues.apache.org/jira/browse/KAFKA-12397
 Project: Kafka
  Issue Type: Bug
Reporter: Sachin


We are running Zookeeper and kafka on DCOS. Currently observing below errors in 
zookeeper logs:

 
NFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@892] - Processing 
mntr command from /100.77.120.5:47676 2021-03-01 06:48:28,223 [myid:2] - INFO 
[Thread-7313:NIOServerCnxn@1040] - Closed socket connection for client 
/100.77.120.5:47676 (no session established for client) 2021-03-01 06:48:31,745 
[myid:2] - INFO [ProcessThread(sid:2 cport:-1)::PrepRequestProcessor@653] - Got 
user-level KeeperException when processing sessionid:0x208f3e3caa5 
type:setData cxid:0x11772f zxid:0x571be5 txntype:-1 reqpath:n/a Error 
Path:/brokers/topics/connect-offsets/partitions/8/state Error:KeeperErrorCode = 
BadVersion for /brokers/topics/connect-offsets/partitions/8/state 2021-03-01 
06:48:31,756 [myid:2] - INFO [ProcessThread(sid:2 
cport:-1)::PrepRequestProcessor@653] - Got user-level KeeperException when 
processing sessionid:0x208f3e3caa5 type:setData cxid:0x117731 
zxid:0x571be6 txntype:-1 reqpath:n/a Error 
Path:/brokers/topics/connect-offsets/partitions/20/state Error:KeeperErrorCode 
= BadVersion for /brokers/topics/connect-offsets/partitions/20/state 2



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12381) Incompatible change in verifiable_producer.log in 2.8

2021-03-01 Thread Boyang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17293286#comment-17293286
 ] 

Boyang Chen commented on KAFKA-12381:
-

I checked 2.7 code, and we only return INVALID_REPLICATION_FACTOR for internal 
topics:
{code:java}
if (isInternal(topic)) {
  val topicMetadata = createInternalTopic(topic)
  if (topicMetadata.errorCode == Errors.COORDINATOR_NOT_AVAILABLE.code)
metadataResponseTopic(Errors.INVALID_REPLICATION_FACTOR, topic, true, 
util.Collections.emptyList())
  else
topicMetadata
} else if (allowAutoTopicCreation && config.autoCreateTopicsEnable) {
  createTopic(topic, config.numPartitions, config.defaultReplicationFactor)
} else {
  metadataResponseTopic(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, false, 
util.Collections.emptyList())
}
{code}
which seems to be lost in the new auto topic creation module. I will try to do 
a fix there.

> Incompatible change in verifiable_producer.log in 2.8
> -
>
> Key: KAFKA-12381
> URL: https://issues.apache.org/jira/browse/KAFKA-12381
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.8.0
>Reporter: Colin McCabe
>Assignee: Boyang Chen
>Priority: Blocker
>  Labels: kip-500
>
> In test_verifiable_producer.py , we used to see this error message in 
> verifiable_producer.log when a topic couldn't be created:
> WARN [Producer clientId=producer-1] Error while fetching metadata with 
> correlation id 1 : {test_topic=LEADER_NOT_AVAILABLE} 
> (org.apache.kafka.clients.NetworkClient)
> The test does a grep LEADER_NOT_AVAILABLE on the log in this case, and it 
> used to pass.
> Now we are instead seeing this in the log file:
> WARN [Producer clientId=producer-1] Error while fetching metadata with 
> correlation id 1 : {test_topic=INVALID_REPLICATION_FACTOR} 
> (org.apache.kafka.clients.NetworkClient)
> And of course now the test fails.
> The INVALID_REPLICATION_FACTOR is coming from the new auto topic creation 
> manager.
> It is a simple matter to make the test pass -- I have confirmed that it 
> passes if we grep for INVALID_REPLICATION_FACTOR in the log file instead of 
> LEADER_NOT_AVAILABLE.
> I think we just need to decide if this change in behavior is acceptable or 
> not.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] junrao commented on a change in pull request #10173: KAFKA-9548 Added SPIs and public classes/interfaces introduced in KIP-405 for tiered storage feature in Kafka.

2021-03-01 Thread GitBox


junrao commented on a change in pull request #10173:
URL: https://github.com/apache/kafka/pull/10173#discussion_r585109788



##
File path: clients/src/main/java/org/apache/kafka/common/TopicIdPartition.java
##
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common;
+
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.UUID;
+
+/**
+ * This represents universally unique identifier with topic id for a topic 
partition. This makes sure that topics
+ * recreated with the same name will always have unique topic identifiers.
+ */
+public class TopicIdPartition implements Serializable {

Review comment:
   Could we consolidate this with the TopicIdPartition in BrokersToIsrs?

##
File path: 
clients/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java
##
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+/**
+ * It describes the metadata about a topic partition's remote log segment in 
the remote storage. This is uniquely
+ * represented with {@link RemoteLogSegmentId}.
+ * 
+ * New instance is always created with the state as {@link 
RemoteLogSegmentState#COPY_SEGMENT_STARTED}. This can be
+ * updated by applying {@link RemoteLogSegmentMetadataUpdate} for the 
respective {@link RemoteLogSegmentId} of the
+ * {@code RemoteLogSegmentMetadata}.
+ */
+@InterfaceStability.Evolving
+public class RemoteLogSegmentMetadata implements Serializable {

Review comment:
   We typically don't use java serialization. Is Serializable needed? Ditto 
in a few other classes.

##
File path: 
clients/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentId.java
##
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.UUID;
+
+/**
+ * This class represents a universally unique identifier associated to a topic 
partition's log segment. This will be
+ * regenerated for every attempt of copying a specific log segment in {@link 
RemoteStorageManager#copyLogSegmentData(RemoteLogSegmentMetadata, 
LogSegmentData)}.
+ * Once it is 

[jira] [Commented] (KAFKA-12381) Incompatible change in verifiable_producer.log in 2.8

2021-03-01 Thread Jason Gustafson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17293264#comment-17293264
 ] 

Jason Gustafson commented on KAFKA-12381:
-

[~bchen225242] I would not focus too much on the system test failure. The main 
thing to verify is whether there is a behavior change here in the `Metadata` 
response. If we are returning `INVALID_REPLICATION_FACTOR` in some case where 
we were previously returning `LEADER_NOT_AVAILABLE`, then it is likely a 
regression. 

> Incompatible change in verifiable_producer.log in 2.8
> -
>
> Key: KAFKA-12381
> URL: https://issues.apache.org/jira/browse/KAFKA-12381
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.8.0
>Reporter: Colin McCabe
>Assignee: Boyang Chen
>Priority: Blocker
>  Labels: kip-500
>
> In test_verifiable_producer.py , we used to see this error message in 
> verifiable_producer.log when a topic couldn't be created:
> WARN [Producer clientId=producer-1] Error while fetching metadata with 
> correlation id 1 : {test_topic=LEADER_NOT_AVAILABLE} 
> (org.apache.kafka.clients.NetworkClient)
> The test does a grep LEADER_NOT_AVAILABLE on the log in this case, and it 
> used to pass.
> Now we are instead seeing this in the log file:
> WARN [Producer clientId=producer-1] Error while fetching metadata with 
> correlation id 1 : {test_topic=INVALID_REPLICATION_FACTOR} 
> (org.apache.kafka.clients.NetworkClient)
> And of course now the test fails.
> The INVALID_REPLICATION_FACTOR is coming from the new auto topic creation 
> manager.
> It is a simple matter to make the test pass -- I have confirmed that it 
> passes if we grep for INVALID_REPLICATION_FACTOR in the log file instead of 
> LEADER_NOT_AVAILABLE.
> I think we just need to decide if this change in behavior is acceptable or 
> not.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-12359) Update Jetty to 11

2021-03-01 Thread John Stacy (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17293247#comment-17293247
 ] 

John Stacy edited comment on KAFKA-12359 at 3/1/21, 11:49 PM:
--

Due to this vulnerability (CVE-2020-27223), you might want to bump to 11.0.1: 
[https://github.com/eclipse/jetty.project/security/advisories/GHSA-m394-8rww-3jr7]


was (Author: jrstacy):
Due to this vulnerability, you might want to bump to 11.0.1: 
https://github.com/eclipse/jetty.project/security/advisories/GHSA-m394-8rww-3jr7

> Update Jetty to 11
> --
>
> Key: KAFKA-12359
> URL: https://issues.apache.org/jira/browse/KAFKA-12359
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect, tools
>Reporter: Dongjin Lee
>Assignee: Dongjin Lee
>Priority: Major
>
> I found this problem when I was working on 
> [KAFKA-12324|https://issues.apache.org/jira/browse/KAFKA-12324].
> As of present, Kafka Connect and Trogdor are using Jetty 9. Although Jetty's 
> stable release is 9.4, the Jetty community is now moving their focus to Jetty 
> 10 and 11, which requires Java 11 as a prerequisite. To minimize potential 
> security vulnerability, Kafka should migrate into Java 11 + Jetty 11 as soon 
> as Jetty 9.4 reaches the end of life. As a note, [Jetty 9.2 reached End of 
> Life in March 
> 2018|https://www.eclipse.org/lists/jetty-announce/msg00116.html], and 9.3 
> also did in [February 
> 2020|https://www.eclipse.org/lists/jetty-announce/msg00140.html].
> In other words, the necessity of moving to Java 11 is heavily affected by 
> Jetty's maintenance plan. Jetty 9.4 seems like still be supported for a 
> certain period of time, but it is worth being aware of these relationships 
> and having a migration plan.
> Updating Jetty to 11 is not resolved by simply changing the version. Along 
> with its API changes, we have to cope with additional dependencies, [Java EE 
> class name changes|https://webtide.com/renaming-from-javax-to-jakarta/], 
> Making Jackson to compatible with the changes, etc.
> As a note: for the difference between Jetty 10 and 11, see 
> [here|https://webtide.com/jetty-10-and-11-have-arrived/] - in short, "Jetty 
> 11 is identical to Jetty 10 except that the javax.* packages now conform to 
> the new jakarta.* namespace.".



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12359) Update Jetty to 11

2021-03-01 Thread John Stacy (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17293247#comment-17293247
 ] 

John Stacy commented on KAFKA-12359:


Due to this vulnerability, you might want to bump to 11.0.1: 
https://github.com/eclipse/jetty.project/security/advisories/GHSA-m394-8rww-3jr7

> Update Jetty to 11
> --
>
> Key: KAFKA-12359
> URL: https://issues.apache.org/jira/browse/KAFKA-12359
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect, tools
>Reporter: Dongjin Lee
>Assignee: Dongjin Lee
>Priority: Major
>
> I found this problem when I was working on 
> [KAFKA-12324|https://issues.apache.org/jira/browse/KAFKA-12324].
> As of present, Kafka Connect and Trogdor are using Jetty 9. Although Jetty's 
> stable release is 9.4, the Jetty community is now moving their focus to Jetty 
> 10 and 11, which requires Java 11 as a prerequisite. To minimize potential 
> security vulnerability, Kafka should migrate into Java 11 + Jetty 11 as soon 
> as Jetty 9.4 reaches the end of life. As a note, [Jetty 9.2 reached End of 
> Life in March 
> 2018|https://www.eclipse.org/lists/jetty-announce/msg00116.html], and 9.3 
> also did in [February 
> 2020|https://www.eclipse.org/lists/jetty-announce/msg00140.html].
> In other words, the necessity of moving to Java 11 is heavily affected by 
> Jetty's maintenance plan. Jetty 9.4 seems like still be supported for a 
> certain period of time, but it is worth being aware of these relationships 
> and having a migration plan.
> Updating Jetty to 11 is not resolved by simply changing the version. Along 
> with its API changes, we have to cope with additional dependencies, [Java EE 
> class name changes|https://webtide.com/renaming-from-javax-to-jakarta/], 
> Making Jackson to compatible with the changes, etc.
> As a note: for the difference between Jetty 10 and 11, see 
> [here|https://webtide.com/jetty-10-and-11-have-arrived/] - in short, "Jetty 
> 11 is identical to Jetty 10 except that the javax.* packages now conform to 
> the new jakarta.* namespace.".



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12394) Consider topic id existence and authorization errors

2021-03-01 Thread Jason Gustafson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17293235#comment-17293235
 ] 

Jason Gustafson commented on KAFKA-12394:
-

The specific case where this came up was `DeleteTopic` handling. It sounds like 
there is basic agreement to use `TOPIC_AUTHORIZATION_FAILED`. Unless there are 
objections, I will go ahead and merge this into the minor PR here: 
https://github.com/apache/kafka/pull/10223.

> Consider topic id existence and authorization errors
> 
>
> Key: KAFKA-12394
> URL: https://issues.apache.org/jira/browse/KAFKA-12394
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Priority: Major
>
> We have historically had logic in the api layer to avoid leaking the 
> existence or non-existence of topics to clients which are not authorized to 
> describe them. The way we have done this is to always authorize the topic 
> name first before checking existence.
> Topic ids make this more difficult because the resource (ie the topic name) 
> has to be derived. This means we have to check existence of the topic first. 
> If the topic does not exist, then our hands are tied and we have to return 
> UNKNOWN_TOPIC_ID. If the topic does exist, then we need to check if the 
> client is authorized to describe it. The question comes then what we should 
> do if the client is not authorized?
> The current behavior is to return UNKNOWN_TOPIC_ID. The downside is that this 
> is misleading and forces the client to retry even though they are doomed to 
> hit the same error. However, the client should generally handle this by 
> requesting Metadata using the topic name that they are interested in, which 
> would give them a chance to see the topic authorization error. Basically the 
> fact that you need describe permission in the first place to discover the 
> topic id makes this an unlikely scenario.
> There is an argument to be made for TOPIC_AUTHORIZATION_FAILED as well. 
> Basically we could take the stance that we do not care about leaking the 
> existence of topic IDs since they do not reveal anything about the underlying 
> topic. Additionally, there is little likelihood of a user discovering a valid 
> UUID by accident or even through brute force. The benefit of this is that 
> users get a clear error for cases where a topic Id may have been discovered 
> through some external means. For example, an administrator finds a topic ID 
> in the logging and attempts to delete it using the new `deleteTopicsWithIds` 
> Admin API.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on pull request #10206: KAFKA-12369; Implement `ListTransactions` API

2021-03-01 Thread GitBox


hachikuji commented on pull request #10206:
URL: https://github.com/apache/kafka/pull/10206#issuecomment-788392877


   @chia7712 I implemented your suggestion to include the unknown states in the 
response. Let me know how this looks to you.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10206: KAFKA-12369; Implement `ListTransactions` API

2021-03-01 Thread GitBox


hachikuji commented on a change in pull request #10206:
URL: https://github.com/apache/kafka/pull/10206#discussion_r585128257



##
File path: 
clients/src/main/resources/common/message/ListTransactionsResponse.json
##
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 66,
+  "type": "response",
+  "name": "ListTransactionsResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+  { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
+"about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
+  { "name": "ErrorCode", "type": "int16", "versions": "0+" },
+  { "name": "TransactionStates", "type": "[]TransactionState", "versions": 
"0+", "fields": [
+{ "name": "TransactionalId", "type": "string", "versions": "0+", 
"entityType": "transactionalId" },
+{ "name": "ProducerId", "type": "int64", "versions": "0+", 
"entityType": "producerId" },
+{ "name": "TransactionState", "type": "string", "versions": "0+" }

Review comment:
   I added an `about` tag for `TransactionState`. I think the `entityType` 
tag is enough for the other two fields.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10206: KAFKA-12369; Implement `ListTransactions` API

2021-03-01 Thread GitBox


hachikuji commented on a change in pull request #10206:
URL: https://github.com/apache/kafka/pull/10206#discussion_r585123307



##
File path: 
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
##
@@ -223,6 +224,46 @@ class TransactionStateManager(brokerId: Int,
   throw new IllegalStateException(s"Unexpected empty transaction metadata 
returned while putting $txnMetadata")))
   }
 
+  def listTransactionStates(
+filterProducerIds: Set[Long],
+filterStateNames: Set[String]
+  ): Either[Errors, List[ListTransactionsResponseData.TransactionState]] = {
+inReadLock(stateLock) {
+  if (loadingPartitions.nonEmpty) {
+Left(Errors.COORDINATOR_LOAD_IN_PROGRESS)
+  } else {
+val filterStates = filterStateNames.flatMap(TransactionState.fromName)

Review comment:
   You can see `listConsumerGroups` to see how this was handled in the 
analogous case for consumer groups.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10232: KAFKA-12352: Make sure all rejoin group and reset state has a reason

2021-03-01 Thread GitBox


ableegoldman commented on a change in pull request #10232:
URL: https://github.com/apache/kafka/pull/10232#discussion_r585070038



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -775,8 +776,6 @@ public void handle(SyncGroupResponse syncResponse,
 }
 }
 } else {
-requestRejoin();

Review comment:
   Just to clarify, you mean we don't need to rejoin here since we will 
always raise an error, and always rejoin (if necessary) when checking that 
error?
   
   Or are you referring to the `requestRejoinOnResponseError` calls you added 
to the two last cases in the below if/else? 

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -961,29 +962,34 @@ protected synchronized String memberId() {
 return generation.memberId;
 }
 
-private synchronized void resetState() {
+private synchronized void resetState(final String reason) {

Review comment:
   nit: rename to `resetStateAndGeneration`?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -802,8 +801,10 @@ public void handle(SyncGroupResponse syncResponse,
 log.info("SyncGroup failed: {} Marking coordinator 
unknown. Sent generation was {}",
  error.message(), sentGeneration);
 markCoordinatorUnknown(error);
+requestRejoinOnResponseError(ApiKeys.SYNC_GROUP, error);

Review comment:
   Why do we explicitly rejoin in this case, but not eg 
`REBALANCE_IN_PROGRESS`? or `UNKNOWN_MEMBER_ID`/`ILLEGAL_GENERATION` ?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -961,29 +962,34 @@ protected synchronized String memberId() {
 return generation.memberId;
 }
 
-private synchronized void resetState() {
+private synchronized void resetState(final String reason) {
+log.info("Resetting generation due to: {}", reason);
+
 state = MemberState.UNJOINED;
 generation = Generation.NO_GENERATION;
 }
 
-private synchronized void resetStateAndRejoin() {
-resetState();
-rejoinNeeded = true;
+private synchronized void resetStateAndRejoin(final String reason) {
+resetState(reason);
+requestRejoin(reason);
 }
 
 synchronized void resetGenerationOnResponseError(ApiKeys api, Errors 
error) {
-log.debug("Resetting generation after encountering {} from {} response 
and requesting re-join", error, api);

Review comment:
   SGTM. If we find it flooding the logs and not helpful we can reconsider





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10215: KAFKA-12375: don't reuse thread.id until a thread has fully shut down

2021-03-01 Thread GitBox


ableegoldman commented on a change in pull request #10215:
URL: https://github.com/apache/kafka/pull/10215#discussion_r585107512



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1094,16 +1100,32 @@ private int getNumStreamThreads(final boolean 
hasGlobalTopology) {
 }
 
 private int getNextThreadIndex() {
-final HashSet names = new HashSet<>();
-processStreamThread(thread -> names.add(thread.getName()));
-final String baseName = clientId + "-StreamThread-";
-for (int i = 1; i <= threads.size(); i++) {
-final String name = baseName + i;
-if (!names.contains(name)) {
-return i;
+final HashSet allLiveThreadNames = new HashSet<>();
+final AtomicInteger maxThreadId = new AtomicInteger(1);

Review comment:
   It's because variables used in a lamdba must "final or effectively 
final" whatever that means





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10215: KAFKA-12375: don't reuse thread.id until a thread has fully shut down

2021-03-01 Thread GitBox


ableegoldman commented on a change in pull request #10215:
URL: https://github.com/apache/kafka/pull/10215#discussion_r585107230



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1094,16 +1100,32 @@ private int getNumStreamThreads(final boolean 
hasGlobalTopology) {
 }
 
 private int getNextThreadIndex() {
-final HashSet names = new HashSet<>();
-processStreamThread(thread -> names.add(thread.getName()));
-final String baseName = clientId + "-StreamThread-";
-for (int i = 1; i <= threads.size(); i++) {
-final String name = baseName + i;
-if (!names.contains(name)) {
-return i;
+final HashSet allLiveThreadNames = new HashSet<>();
+final AtomicInteger maxThreadId = new AtomicInteger(1);
+synchronized (threads) {
+processStreamThread(thread -> {
+// trim any DEAD threads from the list so we can reuse the 
thread.id
+// this is only safe to do once the thread has fully completed 
shutdown
+if (thread.state() == StreamThread.State.DEAD) {
+threads.remove(thread);
+} else {
+allLiveThreadNames.add(thread.getName());
+final int threadId = 
thread.getName().charAt(thread.getName().length() - 1);

Review comment:
   Oh duh. That would have been an annoying bug to find in production  





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10223: MINOR: Do not expose topic name in `DeleteTopic` response if no describe permission

2021-03-01 Thread GitBox


hachikuji commented on a change in pull request #10223:
URL: https://github.com/apache/kafka/pull/10223#discussion_r585027051



##
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##
@@ -3479,6 +3481,161 @@ class KafkaApisTest {
 assertEquals(List(mkTopicData(topic = "foo", Seq(1, 2))), 
fooState.topics.asScala.toList)
   }
 
+  @Test
+  def testDeleteTopicsByIdAuthorization(): Unit = {
+val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+val controllerContext: ControllerContext = 
EasyMock.mock(classOf[ControllerContext])
+
+EasyMock.expect(clientControllerQuotaManager.newQuotaFor(
+  EasyMock.anyObject(classOf[RequestChannel.Request]),
+  EasyMock.anyShort()
+)).andReturn(UnboundedControllerMutationQuota)
+EasyMock.expect(controller.isActive).andReturn(true)
+
EasyMock.expect(controller.controllerContext).andStubReturn(controllerContext)
+
+// Try to delete three topics:
+// 1. One without describe permission
+// 2. One without delete permission
+// 3. One which is authorized, but doesn't exist
+
+expectTopicAuthorization(authorizer, AclOperation.DESCRIBE, Map(
+  "foo" -> AuthorizationResult.DENIED,
+  "bar" -> AuthorizationResult.ALLOWED
+))
+
+expectTopicAuthorization(authorizer, AclOperation.DELETE, Map(
+  "foo" -> AuthorizationResult.DENIED,
+  "bar" -> AuthorizationResult.DENIED
+))
+
+val topicIdsMap = Map(
+  Uuid.randomUuid() -> Some("foo"),
+  Uuid.randomUuid() -> Some("bar"),
+  Uuid.randomUuid() -> None
+)
+
+topicIdsMap.foreach { case (topicId, topicNameOpt) =>
+  
EasyMock.expect(controllerContext.topicName(topicId)).andReturn(topicNameOpt)
+}
+
+val topicDatas = topicIdsMap.keys.map { topicId =>
+  new DeleteTopicsRequestData.DeleteTopicState().setTopicId(topicId)
+}.toList
+val deleteRequest = new DeleteTopicsRequest.Builder(new 
DeleteTopicsRequestData()
+  .setTopics(topicDatas.asJava))
+  .build(ApiKeys.DELETE_TOPICS.latestVersion)
+
+val request = buildRequest(deleteRequest)
+val capturedResponse = expectNoThrottling(request)
+
+EasyMock.replay(replicaManager, clientRequestQuotaManager, 
clientControllerQuotaManager,
+  requestChannel, txnCoordinator, controller, controllerContext, 
authorizer)
+createKafkaApis(authorizer = 
Some(authorizer)).handleDeleteTopicsRequest(request)
+
+val deleteResponse = 
capturedResponse.getValue.asInstanceOf[DeleteTopicsResponse]
+
+topicIdsMap.foreach { case (topicId, nameOpt) =>
+  val response = deleteResponse.data.responses.asScala.find(_.topicId == 
topicId).get
+  nameOpt match {
+case Some("foo") =>
+  assertNull(response.name)
+  assertEquals(Errors.UNKNOWN_TOPIC_ID, 
Errors.forCode(response.errorCode))
+case Some("bar") =>
+  assertEquals("bar", response.name)
+  assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED, 
Errors.forCode(response.errorCode))
+case None =>
+  assertNull(response.name)
+  assertEquals(Errors.UNKNOWN_TOPIC_ID, 
Errors.forCode(response.errorCode))
+case _ =>
+  fail("Unexpected topic id/name mapping")
+  }
+}
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testDeleteTopicsByNameAuthorization(usePrimitiveTopicNameArray: 
Boolean): Unit = {
+val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+
+EasyMock.expect(clientControllerQuotaManager.newQuotaFor(
+  EasyMock.anyObject(classOf[RequestChannel.Request]),
+  EasyMock.anyShort()
+)).andReturn(UnboundedControllerMutationQuota)
+EasyMock.expect(controller.isActive).andReturn(true)
+
+// Try to delete three topics:
+// 1. One without describe permission
+// 2. One without delete permission
+// 3. One which is authorized, but doesn't exist
+
+expectTopicAuthorization(authorizer, AclOperation.DESCRIBE, Map(
+  "foo" -> AuthorizationResult.DENIED,
+  "bar" -> AuthorizationResult.ALLOWED,
+  "baz" -> AuthorizationResult.ALLOWED
+))
+
+expectTopicAuthorization(authorizer, AclOperation.DELETE, Map(
+  "foo" -> AuthorizationResult.DENIED,
+  "bar" -> AuthorizationResult.DENIED,
+  "baz" -> AuthorizationResult.ALLOWED
+))
+
+val deleteRequest = if (usePrimitiveTopicNameArray) {
+  new DeleteTopicsRequest.Builder(new DeleteTopicsRequestData()
+.setTopicNames(List("foo", "bar", "baz").asJava))
+.build(5.toShort)
+} else {
+  val topicDatas = List(
+new DeleteTopicsRequestData.DeleteTopicState().setName("foo"),
+new DeleteTopicsRequestData.DeleteTopicState().setName("bar"),
+new DeleteTopicsRequestData.DeleteTopicState().setName("baz")
+  )
+  new DeleteTopicsRequest.Builder(new DeleteTopicsRequestData()
+.setTopics(topicDatas.asJava))
+

[GitHub] [kafka] hachikuji commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

2021-03-01 Thread GitBox


hachikuji commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r585086425



##
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##
@@ -154,17 +162,179 @@ class ControllerApis(val requestChannel: RequestChannel,
   requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+val responses = deleteTopics(request.body[DeleteTopicsRequest].data,
+  request.context.apiVersion,
+  authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME),
+  names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, 
names)(n => n),
+  names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, 
names)(n => n))
+requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+  val responseData = new DeleteTopicsResponseData().
+setResponses(new DeletableTopicResultCollection(responses.iterator)).
+setThrottleTimeMs(throttleTimeMs)
+  new DeleteTopicsResponse(responseData)
+})
+  }
+
+  def deleteTopics(request: DeleteTopicsRequestData,
+   apiVersion: Int,
+   hasClusterAuth: Boolean,
+   getDescribableTopics: Iterable[String] => Set[String],
+   getDeletableTopics: Iterable[String] => Set[String]): 
util.List[DeletableTopicResult] = {
+if (!config.deleteTopicEnable) {
+  if (apiVersion < 3) {
+throw new InvalidRequestException("Topic deletion is disabled.")
+  } else {
+throw new TopicDeletionDisabledException()
+  }
+}
+val responses = new util.ArrayList[DeletableTopicResult]
+val duplicatedTopicNames = new util.HashSet[String]
+val topicNamesToResolve = new util.HashSet[String]
+val topicIdsToResolve = new util.HashSet[Uuid]
+val duplicatedTopicIds = new util.HashSet[Uuid]
+
+def appendResponse(name: String, id: Uuid, error: ApiError): Unit = {
+  responses.add(new DeletableTopicResult().
+setName(name).
+setTopicId(id).
+setErrorCode(error.error.code).
+setErrorMessage(error.message))
+}
+
+def maybeAppendToTopicNamesToResolve(name: String): Unit = {
+  if (duplicatedTopicNames.contains(name) || 
!topicNamesToResolve.add(name)) {
+appendResponse(name, ZERO_UUID, new ApiError(INVALID_REQUEST, 
"Duplicate topic name."))
+topicNamesToResolve.remove(name)
+duplicatedTopicNames.add(name)
+  }
+}
+
+def maybeAppendToIdsToResolve(id: Uuid): Unit = {
+  if (duplicatedTopicIds.contains(id) || !topicIdsToResolve.add(id)) {
+appendResponse(null, id, new ApiError(INVALID_REQUEST, "Duplicate 
topic ID."))
+topicIdsToResolve.remove(id)
+duplicatedTopicIds.add(id)
+  }
+}
+
+request.topicNames.forEach(maybeAppendToTopicNamesToResolve)
+
+request.topics.forEach {
+  topic => if (topic.name == null) {
+if (topic.topicId.equals(ZERO_UUID)) {
+  appendResponse(null, ZERO_UUID, new ApiError(INVALID_REQUEST,
+"Neither topic name nor id were specified."))
+} else {
+  maybeAppendToIdsToResolve(topic.topicId)
+}
+  } else {
+if (topic.topicId.equals(ZERO_UUID)) {
+  maybeAppendToTopicNamesToResolve(topic.name)
+} else {
+  appendResponse(topic.name, topic.topicId, new 
ApiError(INVALID_REQUEST,
+"You may not specify both topic name and topic id."))
+}
+  }
+}
+
+val idToName = new util.HashMap[Uuid, String]
+val unknownTopicNameErrors = new util.HashMap[String, ApiError]
+def maybeAppendToIdToName(id: Uuid, name: String): Unit = {
+  if (duplicatedTopicIds.contains(id) || idToName.put(id, name) != null) {
+  appendResponse(name, id, new ApiError(INVALID_REQUEST,
+  "The same topic was specified by name and by id."))
+  idToName.remove(id)
+  duplicatedTopicIds.add(id)
+  }
+}
+controller.findTopicIds(topicNamesToResolve).get().forEach { (name, 
idOrError) =>
+  if (idOrError.isError)
+unknownTopicNameErrors.put(name, idOrError.error)
+  else
+maybeAppendToIdToName(idOrError.result, name)
+}
+
+/**
+ * There are 6 error cases to handle here if we don't have permission to 
delete:
+ *
+ * 1. ID provided, topic missing => UNKNOWN_TOPIC_ID
+ * 2. ID provided, topic present, describeable => 
TOPIC_AUTHORIZATION_FAILED
+ * 3. ID provided, topic present, undescribeable => UNKNOWN_TOPIC_ID
+ * 4. name provided, topic missing, undescribable => 
UNKNOWN_TOPIC_OR_PARTITION

Review comment:
   These cases seem wrong. It should be the following:
   ```
* 4. name provided, topic missing, undescribable => 
TOPIC_AUTHORIZATION_FAILED
* 5. name provided, topic missing, describable => 

[GitHub] [kafka] cmccabe commented on a change in pull request #10227: KAFKA-12382: add a README for KIP-500

2021-03-01 Thread GitBox


cmccabe commented on a change in pull request #10227:
URL: https://github.com/apache/kafka/pull/10227#discussion_r585094036



##
File path: KIP-500.md
##
@@ -0,0 +1,131 @@
+KIP-500 Early Access Release
+
+
+# Introduction
+It is now possible to run Apache Kafka without Apache ZooKeeper!  We call this 
mode [self-managed 
mode](https://cwiki.apache.org/confluence/display/KAFKA/KIP-500%3A+Replace+ZooKeeper+with+a+Self-Managed+Metadata+Quorum).
  It is currently *EARLY ACCESS AND SHOULD NOT BE USED IN PRODUCTION*, but it 
is available for testing in the Kafka 2.8 release.
+
+When the Kafka cluster is in self-managed mode, it does not store its metadata 
in ZooKeeper.  In fact, you do not have to run ZooKeeper at all, because it 
stores its metadata in a Raft quorum of controller nodes.
+
+Self-managed mode has many benefits-- some obvious, and some not so obvious.  
Clearly, it is nice to manage and configure one service rather than two 
services.  In addition, you can now run a single process Kafka cluster.  Most 
important of all, self-managed mode is more scalable.  We expect to be able to 
[support many more topics and 
partitions](https://www.confluent.io/kafka-summit-san-francisco-2019/kafka-needs-no-keeper/)
 in this mode.
+
+# Quickstart
+
+## Warning
+Self-managed mode in Kafka 2.8 is provided for testing only, *NOT* for 
production.  We do not yet support upgrading existing ZooKeeper-based Kafka 
clusters into this mode.  In fact, when Kafka 3.0 is released, it may not even 
be possible to upgrade your self-managed clusters from 2.8 to 3.0 without 
downtime.  There may be bugs, including serious ones.  You should *assume that 
your data could be lost at any time* if you try the early access release of 
KIP-500.
+
+## Generate a cluster ID
+The first step is to generate an ID for your new cluster, using the 
kafka-storage tool:
+
+
+$ ./bin/kafka-storage.sh random-uuid
+xtzWWN4bTjitpL3kfd9s5g
+
+
+## Format Storage Directories
+The next step is to format your storage directories.  If you are running in 
single-node mode, you can do this with one command:
+
+
+$ ./bin/kafka-storage.sh format -t xtzWWN4bTjitpL3kfd9s5g -c 
./config/raft-combined.properties
+Formatting /tmp/raft-combined-logs
+
+
+If you are using multiple nodes, then you should run the format command on 
each node.  Be sure to use the same cluster ID for each one.
+
+## Start the Kafka Server
+Finally, you are ready to start the Kafka server on each node.
+
+
+$ ./bin/kafka-server-start.sh ./config/raft-combined.properties
+[2021-02-26 15:37:11,071] INFO Registered kafka:type=kafka.Log4jController 
MBean (kafka.utils.Log4jControllerRegistration$)
+[2021-02-26 15:37:11,294] INFO Setting -D 
jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS 
renegotiation (org.apache.zookeeper.common.X509Util)
+[2021-02-26 15:37:11,466] INFO [Log partition=@metadata-0, 
dir=/tmp/raft-combined-logs] Loading producer state till offset 0 with message 
format version 2 (kafka.log.Log)
+[2021-02-26 15:37:11,509] INFO [raft-expiration-reaper]: Starting 
(kafka.raft.TimingWheelExpirationService$ExpiredOperationReaper)
+[2021-02-26 15:37:11,640] INFO [RaftManager nodeId=1] Completed transition to 
Unattached(epoch=0, voters=[1], electionTimeoutMs=9037) 
(org.apache.kafka.raft.QuorumState)
+...
+
+
+Just like with a ZooKeeper based broker, you can connect to port 9092 (or 
whatever port you configured) to perform administrative operations or produce 
or consume data.
+
+
+$ ./bin/kafka-topics.sh --create --topic foo --partitions 1 
--replication-factor 1 --bootstrap-server localhost:9092
+Created topic foo.
+
+
+# Deployment
+Unlike in ZooKeeper-based mode, where any server can become the controller, in 
self-managed mode, only a small group of specially selected servers can act as 
controllers.  The specially selected controller servers will participate in the 
metadata quorum.  Each KIP-500 controller server is either active, or a hot 
standby for the current active controller server.
+
+Typically you will select either 3 or 5 servers for this role, depending on 
the size of your cluster.  Just like with ZooKeeper, you must keep a majority 
of the controllers alive in order to maintain availability.  So if you have 3 
controllers, you can tolerate 1 failure; with 5 controllers, you can tolerate 2 
failures.
+
+Each Kafka server now has a new configuration key called `process.roles` which 
can have the following values:
+
+* If `process.roles` is set to `broker`, the server acts as a self-managed 
broker.
+* If `process.roles` is set to `controller`, the server acts as a self-managed 
controller.
+* If `process.roles` is set to `broker,controller`, the server acts as both a 
self-managed broker and a self-managd controller.
+* If `process.roles` is not set at all then we are assumed to be in ZooKeeper 
mode.  As mentioned earlier, you can't yet transition 

[GitHub] [kafka] rondagostino commented on a change in pull request #10227: KAFKA-12382: add a README for KIP-500

2021-03-01 Thread GitBox


rondagostino commented on a change in pull request #10227:
URL: https://github.com/apache/kafka/pull/10227#discussion_r585093860



##
File path: KIP-500.md
##
@@ -0,0 +1,131 @@
+KIP-500 Early Access Release
+
+
+# Introduction
+It is now possible to run Apache Kafka without Apache ZooKeeper!  We call this 
mode [self-managed 
mode](https://cwiki.apache.org/confluence/display/KAFKA/KIP-500%3A+Replace+ZooKeeper+with+a+Self-Managed+Metadata+Quorum).
  It is currently *EARLY ACCESS AND SHOULD NOT BE USED IN PRODUCTION*, but it 
is available for testing in the Kafka 2.8 release.
+
+When the Kafka cluster is in self-managed mode, it does not store its metadata 
in ZooKeeper.  In fact, you do not have to run ZooKeeper at all, because it 
stores its metadata in a Raft quorum of controller nodes.
+
+Self-managed mode has many benefits-- some obvious, and some not so obvious.  
Clearly, it is nice to manage and configure one service rather than two 
services.  In addition, you can now run a single process Kafka cluster.  Most 
important of all, self-managed mode is more scalable.  We expect to be able to 
[support many more topics and 
partitions](https://www.confluent.io/kafka-summit-san-francisco-2019/kafka-needs-no-keeper/)
 in this mode.
+
+# Quickstart
+
+## Warning
+Self-managed mode in Kafka 2.8 is provided for testing only, *NOT* for 
production.  We do not yet support upgrading existing ZooKeeper-based Kafka 
clusters into this mode.  In fact, when Kafka 3.0 is released, it may not even 
be possible to upgrade your self-managed clusters from 2.8 to 3.0 without 
downtime.  There may be bugs, including serious ones.  You should *assume that 
your data could be lost at any time* if you try the early access release of 
KIP-500.
+
+## Generate a cluster ID
+The first step is to generate an ID for your new cluster, using the 
kafka-storage tool:
+
+
+$ ./bin/kafka-storage.sh random-uuid
+xtzWWN4bTjitpL3kfd9s5g
+
+
+## Format Storage Directories
+The next step is to format your storage directories.  If you are running in 
single-node mode, you can do this with one command:
+
+
+$ ./bin/kafka-storage.sh format -t xtzWWN4bTjitpL3kfd9s5g -c 
./config/raft-combined.properties

Review comment:
   How about `sm-{combined,broker,controller}.properties` where `sm` stands 
for `self-managed`?  I like that better than `nozk` becauae in a few years we 
will still have "zk" in the name; I think a positive name -- it's a config file 
for a self-managed case -- is better than a negative name (Look Mom, no 
ZooKeeper!)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10227: KAFKA-12382: add a README for KIP-500

2021-03-01 Thread GitBox


cmccabe commented on a change in pull request #10227:
URL: https://github.com/apache/kafka/pull/10227#discussion_r585091489



##
File path: KIP-500.md
##
@@ -0,0 +1,131 @@
+KIP-500 Early Access Release
+
+
+# Introduction
+It is now possible to run Apache Kafka without Apache ZooKeeper!  We call this 
mode [self-managed 
mode](https://cwiki.apache.org/confluence/display/KAFKA/KIP-500%3A+Replace+ZooKeeper+with+a+Self-Managed+Metadata+Quorum).
  It is currently *EARLY ACCESS AND SHOULD NOT BE USED IN PRODUCTION*, but it 
is available for testing in the Kafka 2.8 release.
+
+When the Kafka cluster is in self-managed mode, it does not store its metadata 
in ZooKeeper.  In fact, you do not have to run ZooKeeper at all, because it 
stores its metadata in a Raft quorum of controller nodes.
+
+Self-managed mode has many benefits-- some obvious, and some not so obvious.  
Clearly, it is nice to manage and configure one service rather than two 
services.  In addition, you can now run a single process Kafka cluster.  Most 
important of all, self-managed mode is more scalable.  We expect to be able to 
[support many more topics and 
partitions](https://www.confluent.io/kafka-summit-san-francisco-2019/kafka-needs-no-keeper/)
 in this mode.
+
+# Quickstart
+
+## Warning
+Self-managed mode in Kafka 2.8 is provided for testing only, *NOT* for 
production.  We do not yet support upgrading existing ZooKeeper-based Kafka 
clusters into this mode.  In fact, when Kafka 3.0 is released, it may not even 
be possible to upgrade your self-managed clusters from 2.8 to 3.0 without 
downtime.  There may be bugs, including serious ones.  You should *assume that 
your data could be lost at any time* if you try the early access release of 
KIP-500.
+
+## Generate a cluster ID
+The first step is to generate an ID for your new cluster, using the 
kafka-storage tool:
+
+
+$ ./bin/kafka-storage.sh random-uuid
+xtzWWN4bTjitpL3kfd9s5g
+
+
+## Format Storage Directories
+The next step is to format your storage directories.  If you are running in 
single-node mode, you can do this with one command:
+
+
+$ ./bin/kafka-storage.sh format -t xtzWWN4bTjitpL3kfd9s5g -c 
./config/raft-combined.properties

Review comment:
   Perhaps it could be "nozk-combined.properties", etc. just to keep it 
simple? I think that should be a separate PR, though (do you want to file it)?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10227: KAFKA-12382: add a README for KIP-500

2021-03-01 Thread GitBox


cmccabe commented on a change in pull request #10227:
URL: https://github.com/apache/kafka/pull/10227#discussion_r585090820



##
File path: KIP-500.md
##
@@ -0,0 +1,131 @@
+KIP-500 Early Access Release
+
+
+# Introduction
+It is now possible to run Apache Kafka without Apache ZooKeeper!  We call this 
mode [self-managed 
mode](https://cwiki.apache.org/confluence/display/KAFKA/KIP-500%3A+Replace+ZooKeeper+with+a+Self-Managed+Metadata+Quorum).
  It is currently *EARLY ACCESS AND SHOULD NOT BE USED IN PRODUCTION*, but it 
is available for testing in the Kafka 2.8 release.
+
+When the Kafka cluster is in self-managed mode, it does not store its metadata 
in ZooKeeper.  In fact, you do not have to run ZooKeeper at all, because it 
stores its metadata in a Raft quorum of controller nodes.
+
+Self-managed mode has many benefits-- some obvious, and some not so obvious.  
Clearly, it is nice to manage and configure one service rather than two 
services.  In addition, you can now run a single process Kafka cluster.  Most 
important of all, self-managed mode is more scalable.  We expect to be able to 
[support many more topics and 
partitions](https://www.confluent.io/kafka-summit-san-francisco-2019/kafka-needs-no-keeper/)
 in this mode.
+
+# Quickstart
+
+## Warning
+Self-managed mode in Kafka 2.8 is provided for testing only, *NOT* for 
production.  We do not yet support upgrading existing ZooKeeper-based Kafka 
clusters into this mode.  In fact, when Kafka 3.0 is released, it may not even 
be possible to upgrade your self-managed clusters from 2.8 to 3.0 without 
downtime.  There may be bugs, including serious ones.  You should *assume that 
your data could be lost at any time* if you try the early access release of 
KIP-500.
+
+## Generate a cluster ID
+The first step is to generate an ID for your new cluster, using the 
kafka-storage tool:
+
+
+$ ./bin/kafka-storage.sh random-uuid
+xtzWWN4bTjitpL3kfd9s5g
+
+
+## Format Storage Directories
+The next step is to format your storage directories.  If you are running in 
single-node mode, you can do this with one command:
+
+
+$ ./bin/kafka-storage.sh format -t xtzWWN4bTjitpL3kfd9s5g -c 
./config/raft-combined.properties
+Formatting /tmp/raft-combined-logs
+
+
+If you are using multiple nodes, then you should run the format command on 
each node.  Be sure to use the same cluster ID for each one.
+
+## Start the Kafka Server
+Finally, you are ready to start the Kafka server on each node.
+
+
+$ ./bin/kafka-server-start.sh ./config/raft-combined.properties
+[2021-02-26 15:37:11,071] INFO Registered kafka:type=kafka.Log4jController 
MBean (kafka.utils.Log4jControllerRegistration$)
+[2021-02-26 15:37:11,294] INFO Setting -D 
jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS 
renegotiation (org.apache.zookeeper.common.X509Util)
+[2021-02-26 15:37:11,466] INFO [Log partition=@metadata-0, 
dir=/tmp/raft-combined-logs] Loading producer state till offset 0 with message 
format version 2 (kafka.log.Log)
+[2021-02-26 15:37:11,509] INFO [raft-expiration-reaper]: Starting 
(kafka.raft.TimingWheelExpirationService$ExpiredOperationReaper)
+[2021-02-26 15:37:11,640] INFO [RaftManager nodeId=1] Completed transition to 
Unattached(epoch=0, voters=[1], electionTimeoutMs=9037) 
(org.apache.kafka.raft.QuorumState)
+...
+
+
+Just like with a ZooKeeper based broker, you can connect to port 9092 (or 
whatever port you configured) to perform administrative operations or produce 
or consume data.
+
+
+$ ./bin/kafka-topics.sh --create --topic foo --partitions 1 
--replication-factor 1 --bootstrap-server localhost:9092
+Created topic foo.
+
+
+# Deployment
+Unlike in ZooKeeper-based mode, where any server can become the controller, in 
self-managed mode, only a small group of specially selected servers can act as 
controllers.  The specially selected controller servers will participate in the 
metadata quorum.  Each KIP-500 controller server is either active, or a hot 
standby for the current active controller server.
+
+Typically you will select either 3 or 5 servers for this role, depending on 
the size of your cluster.  Just like with ZooKeeper, you must keep a majority 
of the controllers alive in order to maintain availability.  So if you have 3 
controllers, you can tolerate 1 failure; with 5 controllers, you can tolerate 2 
failures.
+
+Each Kafka server now has a new configuration key called `process.roles` which 
can have the following values:
+
+* If `process.roles` is set to `broker`, the server acts as a self-managed 
broker.
+* If `process.roles` is set to `controller`, the server acts as a self-managed 
controller.
+* If `process.roles` is set to `broker,controller`, the server acts as both a 
self-managed broker and a self-managd controller.
+* If `process.roles` is not set at all then we are assumed to be in ZooKeeper 
mode.  As mentioned earlier, you can't yet transition 

[GitHub] [kafka] cmccabe commented on a change in pull request #10227: KAFKA-12382: add a README for KIP-500

2021-03-01 Thread GitBox


cmccabe commented on a change in pull request #10227:
URL: https://github.com/apache/kafka/pull/10227#discussion_r585089808



##
File path: KIP-500.md
##
@@ -0,0 +1,131 @@
+KIP-500 Early Access Release
+
+
+# Introduction
+It is now possible to run Apache Kafka without Apache ZooKeeper!  We call this 
mode [self-managed 
mode](https://cwiki.apache.org/confluence/display/KAFKA/KIP-500%3A+Replace+ZooKeeper+with+a+Self-Managed+Metadata+Quorum).
  It is currently *EARLY ACCESS AND SHOULD NOT BE USED IN PRODUCTION*, but it 
is available for testing in the Kafka 2.8 release.
+
+When the Kafka cluster is in self-managed mode, it does not store its metadata 
in ZooKeeper.  In fact, you do not have to run ZooKeeper at all, because it 
stores its metadata in a Raft quorum of controller nodes.
+
+Self-managed mode has many benefits-- some obvious, and some not so obvious.  
Clearly, it is nice to manage and configure one service rather than two 
services.  In addition, you can now run a single process Kafka cluster.  Most 
important of all, self-managed mode is more scalable.  We expect to be able to 
[support many more topics and 
partitions](https://www.confluent.io/kafka-summit-san-francisco-2019/kafka-needs-no-keeper/)
 in this mode.
+
+# Quickstart
+
+## Warning
+Self-managed mode in Kafka 2.8 is provided for testing only, *NOT* for 
production.  We do not yet support upgrading existing ZooKeeper-based Kafka 
clusters into this mode.  In fact, when Kafka 3.0 is released, it may not even 
be possible to upgrade your self-managed clusters from 2.8 to 3.0 without 
downtime.  There may be bugs, including serious ones.  You should *assume that 
your data could be lost at any time* if you try the early access release of 
KIP-500.
+
+## Generate a cluster ID
+The first step is to generate an ID for your new cluster, using the 
kafka-storage tool:
+
+
+$ ./bin/kafka-storage.sh random-uuid
+xtzWWN4bTjitpL3kfd9s5g
+
+
+## Format Storage Directories
+The next step is to format your storage directories.  If you are running in 
single-node mode, you can do this with one command:
+
+
+$ ./bin/kafka-storage.sh format -t xtzWWN4bTjitpL3kfd9s5g -c 
./config/raft-combined.properties
+Formatting /tmp/raft-combined-logs
+
+
+If you are using multiple nodes, then you should run the format command on 
each node.  Be sure to use the same cluster ID for each one.
+
+## Start the Kafka Server
+Finally, you are ready to start the Kafka server on each node.

Review comment:
   I'll just say "server" (singular) since it is singular in this example.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10227: KAFKA-12382: add a README for KIP-500

2021-03-01 Thread GitBox


cmccabe commented on a change in pull request #10227:
URL: https://github.com/apache/kafka/pull/10227#discussion_r585089537



##
File path: KIP-500.md
##
@@ -0,0 +1,131 @@
+KIP-500 Early Access Release
+
+
+# Introduction
+It is now possible to run Apache Kafka without Apache ZooKeeper!  We call this 
mode [self-managed 
mode](https://cwiki.apache.org/confluence/display/KAFKA/KIP-500%3A+Replace+ZooKeeper+with+a+Self-Managed+Metadata+Quorum).
  It is currently *EARLY ACCESS AND SHOULD NOT BE USED IN PRODUCTION*, but it 
is available for testing in the Kafka 2.8 release.
+
+When the Kafka cluster is in self-managed mode, it does not store its metadata 
in ZooKeeper.  In fact, you do not have to run ZooKeeper at all, because it 
stores its metadata in a Raft quorum of controller nodes.
+
+Self-managed mode has many benefits-- some obvious, and some not so obvious.  
Clearly, it is nice to manage and configure one service rather than two 
services.  In addition, you can now run a single process Kafka cluster.  Most 
important of all, self-managed mode is more scalable.  We expect to be able to 
[support many more topics and 
partitions](https://www.confluent.io/kafka-summit-san-francisco-2019/kafka-needs-no-keeper/)
 in this mode.
+
+# Quickstart
+
+## Warning
+Self-managed mode in Kafka 2.8 is provided for testing only, *NOT* for 
production.  We do not yet support upgrading existing ZooKeeper-based Kafka 
clusters into this mode.  In fact, when Kafka 3.0 is released, it may not even 
be possible to upgrade your self-managed clusters from 2.8 to 3.0 without 
downtime.  There may be bugs, including serious ones.  You should *assume that 
your data could be lost at any time* if you try the early access release of 
KIP-500.
+
+## Generate a cluster ID
+The first step is to generate an ID for your new cluster, using the 
kafka-storage tool:
+
+
+$ ./bin/kafka-storage.sh random-uuid
+xtzWWN4bTjitpL3kfd9s5g
+
+
+## Format Storage Directories
+The next step is to format your storage directories.  If you are running in 
single-node mode, you can do this with one command:
+
+
+$ ./bin/kafka-storage.sh format -t xtzWWN4bTjitpL3kfd9s5g -c 
./config/raft-combined.properties
+Formatting /tmp/raft-combined-logs
+
+
+If you are using multiple nodes, then you should run the format command on 
each node.  Be sure to use the same cluster ID for each one.

Review comment:
   It might be better to save that discussion for the "deployment section" 
since we haven't explained the different deployment models yet... or I guess we 
could move the deployment section to be first... not sure.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10227: KAFKA-12382: add a README for KIP-500

2021-03-01 Thread GitBox


cmccabe commented on a change in pull request #10227:
URL: https://github.com/apache/kafka/pull/10227#discussion_r585088151



##
File path: KIP-500.md
##
@@ -0,0 +1,131 @@
+KIP-500 Early Access Release
+
+
+# Introduction
+It is now possible to run Apache Kafka without Apache ZooKeeper!  We call this 
mode [self-managed 
mode](https://cwiki.apache.org/confluence/display/KAFKA/KIP-500%3A+Replace+ZooKeeper+with+a+Self-Managed+Metadata+Quorum).
  It is currently *EARLY ACCESS AND SHOULD NOT BE USED IN PRODUCTION*, but it 
is available for testing in the Kafka 2.8 release.
+
+When the Kafka cluster is in self-managed mode, it does not store its metadata 
in ZooKeeper.  In fact, you do not have to run ZooKeeper at all, because it 
stores its metadata in a Raft quorum of controller nodes.
+
+Self-managed mode has many benefits-- some obvious, and some not so obvious.  
Clearly, it is nice to manage and configure one service rather than two 
services.  In addition, you can now run a single process Kafka cluster.  Most 
important of all, self-managed mode is more scalable.  We expect to be able to 
[support many more topics and 
partitions](https://www.confluent.io/kafka-summit-san-francisco-2019/kafka-needs-no-keeper/)
 in this mode.
+
+# Quickstart
+
+## Warning
+Self-managed mode in Kafka 2.8 is provided for testing only, *NOT* for 
production.  We do not yet support upgrading existing ZooKeeper-based Kafka 
clusters into this mode.  In fact, when Kafka 3.0 is released, it may not even 
be possible to upgrade your self-managed clusters from 2.8 to 3.0 without 
downtime.  There may be bugs, including serious ones.  You should *assume that 
your data could be lost at any time* if you try the early access release of 
KIP-500.
+
+## Generate a cluster ID
+The first step is to generate an ID for your new cluster, using the 
kafka-storage tool:
+
+
+$ ./bin/kafka-storage.sh random-uuid
+xtzWWN4bTjitpL3kfd9s5g
+
+
+## Format Storage Directories
+The next step is to format your storage directories.  If you are running in 
single-node mode, you can do this with one command:

Review comment:
   I wanted to avoid talking about combined mode before the section on 
Deployment. However, I will change this to "For a single-process test 
cluster..." to keep things simple.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12394) Consider topic id existence and authorization errors

2021-03-01 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17293197#comment-17293197
 ] 

Ismael Juma commented on KAFKA-12394:
-

Ah, yes. "topic IDs are not considered sensitive information" is what I was 
thinking about.

> Consider topic id existence and authorization errors
> 
>
> Key: KAFKA-12394
> URL: https://issues.apache.org/jira/browse/KAFKA-12394
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Priority: Major
>
> We have historically had logic in the api layer to avoid leaking the 
> existence or non-existence of topics to clients which are not authorized to 
> describe them. The way we have done this is to always authorize the topic 
> name first before checking existence.
> Topic ids make this more difficult because the resource (ie the topic name) 
> has to be derived. This means we have to check existence of the topic first. 
> If the topic does not exist, then our hands are tied and we have to return 
> UNKNOWN_TOPIC_ID. If the topic does exist, then we need to check if the 
> client is authorized to describe it. The question comes then what we should 
> do if the client is not authorized?
> The current behavior is to return UNKNOWN_TOPIC_ID. The downside is that this 
> is misleading and forces the client to retry even though they are doomed to 
> hit the same error. However, the client should generally handle this by 
> requesting Metadata using the topic name that they are interested in, which 
> would give them a chance to see the topic authorization error. Basically the 
> fact that you need describe permission in the first place to discover the 
> topic id makes this an unlikely scenario.
> There is an argument to be made for TOPIC_AUTHORIZATION_FAILED as well. 
> Basically we could take the stance that we do not care about leaking the 
> existence of topic IDs since they do not reveal anything about the underlying 
> topic. Additionally, there is little likelihood of a user discovering a valid 
> UUID by accident or even through brute force. The benefit of this is that 
> users get a clear error for cases where a topic Id may have been discovered 
> through some external means. For example, an administrator finds a topic ID 
> in the logging and attempts to delete it using the new `deleteTopicsWithIds` 
> Admin API.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kkonstantine commented on a change in pull request #8259: KAFKA-7421: Ensure Connect's PluginClassLoader is truly parallel capable and resolve deadlock occurrences

2021-03-01 Thread GitBox


kkonstantine commented on a change in pull request #8259:
URL: https://github.com/apache/kafka/pull/8259#discussion_r585078292



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
##
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.isolation;
+
+import static org.junit.Assert.fail;
+
+import java.lang.management.LockInfo;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MonitorInfo;
+import java.lang.management.ThreadInfo;
+import java.net.URL;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SynchronizationTest {
+
+public static final Logger log = 
LoggerFactory.getLogger(SynchronizationTest.class);
+
+@Rule
+public final TestName testName = new TestName();
+
+private String threadPrefix;
+private Plugins plugins;
+private ThreadPoolExecutor exec;
+private Breakpoint dclBreakpoint;
+private Breakpoint pclBreakpoint;
+
+@Before
+public void setup() {
+TestPlugins.assertAvailable();
+Map pluginProps = Collections.singletonMap(
+WorkerConfig.PLUGIN_PATH_CONFIG,
+String.join(",", TestPlugins.pluginPath())
+);
+threadPrefix = SynchronizationTest.class.getSimpleName()
++ "." + testName.getMethodName() + "-";
+dclBreakpoint = new Breakpoint<>();
+pclBreakpoint = new Breakpoint<>();
+plugins = new Plugins(pluginProps) {
+@Override
+protected DelegatingClassLoader 
newDelegatingClassLoader(List paths) {
+return AccessController.doPrivileged(
+(PrivilegedAction) () ->
+new SynchronizedDelegatingClassLoader(paths)
+);
+}
+};
+exec = new ThreadPoolExecutor(
+2,
+2,
+1000L,
+TimeUnit.MILLISECONDS,
+new LinkedBlockingDeque<>(),
+threadFactoryWithNamedThreads(threadPrefix)
+);
+
+}
+
+@After
+public void tearDown() throws InterruptedException {
+dclBreakpoint.clear();
+pclBreakpoint.clear();
+exec.shutdown();
+exec.awaitTermination(1L, TimeUnit.SECONDS);
+}
+
+private static class Breakpoint {
+
+private Predicate predicate;
+private CyclicBarrier barrier;
+
+public synchronized void clear() {
+if (barrier != null) {
+barrier.reset();
+}
+predicate = null;
+barrier = null;
+}
+
+public synchronized void set(Predicate predicate) {
+clear();
+this.predicate = predicate;
+// As soon as the barrier is tripped, the barrier will be reset 
for the next round.
+barrier = new CyclicBarrier(2);
+}
+
+/**
+ * From a thread under test, await for the test orchestrator to 
continue execution
+ * @param obj Object to test with the breakpoint's current predicate
+ 

[jira] [Updated] (KAFKA-12396) Dedicated exception for kstreams when null key received

2021-03-01 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-12396:

Labels: beginner newbie  (was: )

> Dedicated exception for kstreams when null key received
> ---
>
> Key: KAFKA-12396
> URL: https://issues.apache.org/jira/browse/KAFKA-12396
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Veniamin Kalegin
>Priority: Trivial
>  Labels: beginner, newbie
>
> If kstreams application received null as a key (thanks to QA), kstream app 
> gives long and confusing stack trace, it would be nice to have shorter and 
> specific exception instead of
> {{org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=0_0, processor=KSTREAM-SOURCE-00, topic=(hidden), 
> partition=0, offset=3722, stacktrace=java.lang.NullPointerException}}
>  at 
> org.apache.kafka.streams.state.internals.RocksDBStore.get(RocksDBStore.java:286)
>  at 
> org.apache.kafka.streams.state.internals.RocksDBStore.get(RocksDBStore.java:74)
>  at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.get(ChangeLoggingKeyValueBytesStore.java:94)
>  at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.get(ChangeLoggingKeyValueBytesStore.java:29)
>  at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$get$2(MeteredKeyValueStore.java:133)
>  at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore$$Lambda$1048/0x60630fd0.get(Unknown
>  Source)
>  at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:851)
>  at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:133)
>  at 
> org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$KeyValueStoreReadWriteDecorator.get(AbstractReadWriteDecorator.java:78)
>  at 
> org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:64)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode$$Lambda$1047/0x60630b10.run(Unknown
>  Source)
>  at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
>  at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:96)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:679)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask$$Lambda$1046/0x605250f0.run(Unknown
>  Source)
>  at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:679)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1033)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:696)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1033)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
> Caused by: java.lang.NullPointerException
>  at 
> org.apache.kafka.streams.state.internals.RocksDBStore.get(RocksDBStore.java:286)
>  at 
> org.apache.kafka.streams.state.internals.RocksDBStore.get(RocksDBStore.java:74)
>  at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.get(ChangeLoggingKeyValueBytesStore.java:94)
>  at 
> 

[jira] [Commented] (KAFKA-12377) Flaky Test SaslAuthenticatorTest#testSslClientAuthRequiredForSaslSslListener

2021-03-01 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17293191#comment-17293191
 ] 

Matthias J. Sax commented on KAFKA-12377:
-

One more

> Flaky Test SaslAuthenticatorTest#testSslClientAuthRequiredForSaslSslListener
> 
>
> Key: KAFKA-12377
> URL: https://issues.apache.org/jira/browse/KAFKA-12377
> Project: Kafka
>  Issue Type: Test
>  Components: core, security, unit tests
>Reporter: Matthias J. Sax
>Priority: Critical
>  Labels: flaky-test
>
> {quote}org.opentest4j.AssertionFailedError: expected:  
> but was:  at 
> org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at 
> org.junit.jupiter.api.AssertionUtils.failNotEqual(AssertionUtils.java:62) at 
> org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182) at 
> org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:177) at 
> org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1124) at 
> org.apache.kafka.common.network.NetworkTestUtils.waitForChannelClose(NetworkTestUtils.java:111)
>  at 
> org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest.createAndCheckClientConnectionFailure(SaslAuthenticatorTest.java:2187)
>  at 
> org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest.createAndCheckSslAuthenticationFailure(SaslAuthenticatorTest.java:2210)
>  at 
> org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest.verifySslClientAuthForSaslSslListener(SaslAuthenticatorTest.java:1846)
>  at 
> org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest.testSslClientAuthRequiredForSaslSslListener(SaslAuthenticatorTest.java:1800){quote}
> STDOUT
> {quote}[2021-02-26 07:18:57,220] ERROR Extensions provided in login context 
> without a token 
> (org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule:318) 
> java.io.IOException: Extensions provided in login context without a token at 
> org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler.handle(OAuthBearerUnsecuredLoginCallbackHandler.java:165)
>  at 
> org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.identifyToken(OAuthBearerLoginModule.java:316)
>  at 
> org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.login(OAuthBearerLoginModule.java:301)
> [...]
> Caused by: 
> org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerConfigException:
>  Extensions provided in login context without a token at 
> org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler.handleTokenCallback(OAuthBearerUnsecuredLoginCallbackHandler.java:192)
>  at 
> org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler.handle(OAuthBearerUnsecuredLoginCallbackHandler.java:163)
>  ... 116 more{quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10251) Flaky Test kafka.api.TransactionsBounceTest.testWithGroupMetadata

2021-03-01 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17293187#comment-17293187
 ] 

Matthias J. Sax commented on KAFKA-10251:
-

Failed again.

> Flaky Test kafka.api.TransactionsBounceTest.testWithGroupMetadata
> -
>
> Key: KAFKA-10251
> URL: https://issues.apache.org/jira/browse/KAFKA-10251
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: A. Sophie Blee-Goldman
>Assignee: Luke Chen
>Priority: Major
>
> h3. Stacktrace
> org.scalatest.exceptions.TestFailedException: Consumed 0 records before 
> timeout instead of the expected 200 records at 
> org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530) at 
> org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529) 
> at 
> org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389) 
> at org.scalatest.Assertions.fail(Assertions.scala:1091) at 
> org.scalatest.Assertions.fail$(Assertions.scala:1087) at 
> org.scalatest.Assertions$.fail(Assertions.scala:1389) at 
> kafka.utils.TestUtils$.pollUntilAtLeastNumRecords(TestUtils.scala:842) at 
> kafka.api.TransactionsBounceTest.testWithGroupMetadata(TransactionsBounceTest.scala:109)
>  
>  
> The logs are pretty much just this on repeat:
> {code:java}
> [2020-07-08 23:41:04,034] ERROR Error when sending message to topic 
> output-topic with key: 9955, value: 9955 with error: 
> (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback:52) 
> org.apache.kafka.common.KafkaException: Failing batch since transaction was 
> aborted at 
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:423)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) 
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at 
> java.lang.Thread.run(Thread.java:748) [2020-07-08 23:41:04,034] ERROR Error 
> when sending message to topic output-topic with key: 9959, value: 9959 with 
> error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback:52) 
> org.apache.kafka.common.KafkaException: Failing batch since transaction was 
> aborted at 
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:423)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) 
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at 
> java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10340) Source connectors should report error when trying to produce records to non-existent topics instead of hanging forever

2021-03-01 Thread Randall Hauch (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17293185#comment-17293185
 ] 

Randall Hauch edited comment on KAFKA-10340 at 3/1/21, 9:34 PM:


Merged to `trunk` and backported to:
* `2.7` for inclusion in the upcoming 2.7.1
* `2.6` for inclusion in the upcoming 2.6.2

The `2.8` branch is currently blocked for the 2.8.0 release, so I've created 
https://github.com/apache/kafka/pull/10238 for the 2.8 backport -- either if 
this is approved as a 2.8.0 blocker or after 2.8.0 has been released.

Leaving this issue open until 2.8 is addressed.


was (Author: rhauch):
Merged to `trunk` and backported to:
* `2.7` for inclusion in the upcoming 2.7.1
* `2.6` for inclusion in the upcoming 2.6.2

The `2.8` branch is currently blocked for the 2.8.0 release, so I've created 
https://github.com/apache/kafka/pull/10238 for the 2.8 backport -- either if 
this is approved as a 2.8.0 blocker or after 2.8.0 has been released.

> Source connectors should report error when trying to produce records to 
> non-existent topics instead of hanging forever
> --
>
> Key: KAFKA-10340
> URL: https://issues.apache.org/jira/browse/KAFKA-10340
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.5.1, 2.7.0, 2.6.1, 2.8.0
>Reporter: Arjun Satish
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.0.0, 2.7.1, 2.6.2
>
>
> Currently, a source connector will blindly attempt to write a record to a 
> Kafka topic. When the topic does not exist, its creation is controlled by the 
> {{auto.create.topics.enable}} config on the brokers. When auto.create is 
> disabled, the producer.send() call on the Connect worker will hang 
> indefinitely (due to the "infinite retries" configuration for said producer). 
> In setups where this config is usually disabled, the source connector simply 
> appears to hang and not produce any output.
> It is desirable to either log an info or an error message (or inform the user 
> somehow) that the connector is simply stuck waiting for the destination topic 
> to be created. When the worker has permissions to inspect the broker 
> settings, it can use the {{listTopics}} and {{describeConfigs}} API in 
> AdminClient to check if the topic exists, the broker can 
> {{auto.create.topics.enable}} topics, and if these cases do not exist, either 
> throw an error.
> With the recently merged 
> [KIP-158|https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics],
>  this becomes even more specific a corner case: when topic creation settings 
> are enabled, the worker should handle the corner case where topic creation is 
> disabled, {{auto.create.topics.enable}} is set to false and topic does not 
> exist.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10340) Source connectors should report error when trying to produce records to non-existent topics instead of hanging forever

2021-03-01 Thread Randall Hauch (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17293185#comment-17293185
 ] 

Randall Hauch commented on KAFKA-10340:
---

Merged to `trunk` and backported to:
* `2.7` for inclusion in the upcoming 2.7.1
* `2.6` for inclusion in the upcoming 2.6.2

The `2.8` branch is currently blocked for the 2.8.0 release, so I've created 
https://github.com/apache/kafka/pull/10238 for the 2.8 backport -- either if 
this is approved as a 2.8.0 blocker or after 2.8.0 has been released.

> Source connectors should report error when trying to produce records to 
> non-existent topics instead of hanging forever
> --
>
> Key: KAFKA-10340
> URL: https://issues.apache.org/jira/browse/KAFKA-10340
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.5.1, 2.7.0, 2.6.1, 2.8.0
>Reporter: Arjun Satish
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.0.0, 2.7.1, 2.6.2
>
>
> Currently, a source connector will blindly attempt to write a record to a 
> Kafka topic. When the topic does not exist, its creation is controlled by the 
> {{auto.create.topics.enable}} config on the brokers. When auto.create is 
> disabled, the producer.send() call on the Connect worker will hang 
> indefinitely (due to the "infinite retries" configuration for said producer). 
> In setups where this config is usually disabled, the source connector simply 
> appears to hang and not produce any output.
> It is desirable to either log an info or an error message (or inform the user 
> somehow) that the connector is simply stuck waiting for the destination topic 
> to be created. When the worker has permissions to inspect the broker 
> settings, it can use the {{listTopics}} and {{describeConfigs}} API in 
> AdminClient to check if the topic exists, the broker can 
> {{auto.create.topics.enable}} topics, and if these cases do not exist, either 
> throw an error.
> With the recently merged 
> [KIP-158|https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics],
>  this becomes even more specific a corner case: when topic creation settings 
> are enabled, the worker should handle the corner case where topic creation is 
> disabled, {{auto.create.topics.enable}} is set to false and topic does not 
> exist.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10340) Source connectors should report error when trying to produce records to non-existent topics instead of hanging forever

2021-03-01 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-10340:
--
Fix Version/s: 2.6.2

> Source connectors should report error when trying to produce records to 
> non-existent topics instead of hanging forever
> --
>
> Key: KAFKA-10340
> URL: https://issues.apache.org/jira/browse/KAFKA-10340
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.5.1, 2.7.0, 2.6.1, 2.8.0
>Reporter: Arjun Satish
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.0.0, 2.7.1, 2.6.2
>
>
> Currently, a source connector will blindly attempt to write a record to a 
> Kafka topic. When the topic does not exist, its creation is controlled by the 
> {{auto.create.topics.enable}} config on the brokers. When auto.create is 
> disabled, the producer.send() call on the Connect worker will hang 
> indefinitely (due to the "infinite retries" configuration for said producer). 
> In setups where this config is usually disabled, the source connector simply 
> appears to hang and not produce any output.
> It is desirable to either log an info or an error message (or inform the user 
> somehow) that the connector is simply stuck waiting for the destination topic 
> to be created. When the worker has permissions to inspect the broker 
> settings, it can use the {{listTopics}} and {{describeConfigs}} API in 
> AdminClient to check if the topic exists, the broker can 
> {{auto.create.topics.enable}} topics, and if these cases do not exist, either 
> throw an error.
> With the recently merged 
> [KIP-158|https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics],
>  this becomes even more specific a corner case: when topic creation settings 
> are enabled, the worker should handle the corner case where topic creation is 
> disabled, {{auto.create.topics.enable}} is set to false and topic does not 
> exist.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8154) Buffer Overflow exceptions between brokers and with clients

2021-03-01 Thread Gordon (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17293184#comment-17293184
 ] 

Gordon commented on KAFKA-8154:
---

Yes, that's the right link.  I've built kafka 2.6.1 with PR 9188 applied:

https://github.com/gordonmessmer/kafkajar

> Buffer Overflow exceptions between brokers and with clients
> ---
>
> Key: KAFKA-8154
> URL: https://issues.apache.org/jira/browse/KAFKA-8154
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.1.0
>Reporter: Rajesh Nataraja
>Priority: Major
> Attachments: server.properties.txt
>
>
> https://github.com/apache/kafka/pull/6495
> https://github.com/apache/kafka/pull/5785



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10788) Streamlining Tests in CachingInMemoryKeyValueStoreTest

2021-03-01 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17293179#comment-17293179
 ] 

Bruno Cadonna commented on KAFKA-10788:
---

[~rohitdeshaws], Thank you for looking into this!

# I think you are better off if you start a new test class 
{{CachingKeyValueStore}} that does not extend {{AbstractKeyValueStoreTest}}. 
Keep in mind that you only want to test the code in {{CachingKeyValueStore}} 
and not the code in the underlying store.
# You do not need a real underlying store for 
{{shouldAvoidFlushingDeletionsWithoutDirtyKeys}}. You can specify the methods 
that should be called on the mock for the underlying store with {{replay()}} 
and you can verify the calls on the mock with {{verify()}}. See the EasyMock 
docs for more details.
# Are you familiar with EasyMock? If not, I can imagine that some tests might 
not seem comprehensive at first sight. See the EasyMock docs to understand them 
better and let us know what you do not understand.

Hope that helps to unblock you.

> Streamlining Tests in CachingInMemoryKeyValueStoreTest
> --
>
> Key: KAFKA-10788
> URL: https://issues.apache.org/jira/browse/KAFKA-10788
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Sagar Rao
>Assignee: Rohit Deshpande
>Priority: Major
>  Labels: newbie
>
> While reviewing, kIP-614, it was decided that tests for 
> [CachingInMemoryKeyValueStoreTest.java|https://github.com/apache/kafka/pull/9508/files/899b79781d3412658293b918dce16709121accf1#diff-fdfe70d8fa0798642f0ed54785624aa9850d5d86afff2285acdf12f2775c3588]
>  need to be streamlined to use mocked underlyingStore.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-5146) Kafka Streams: remove compile dependency on connect-json

2021-03-01 Thread Marco Lotz (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17293178#comment-17293178
 ] 

Marco Lotz commented on KAFKA-5146:
---

[~ableegoldman] [~mjsax] whenever you have some spare time, can you please have 
a quick look on the PR?

> Kafka Streams: remove compile dependency on connect-json
> 
>
> Key: KAFKA-5146
> URL: https://issues.apache.org/jira/browse/KAFKA-5146
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0, 0.10.2.0, 0.10.2.1
>Reporter: Michael G. Noll
>Assignee: Marco Lotz
>Priority: Minor
>
> We currently have a compile-dependency on `connect-json`:
> {code}
> 
>   org.apache.kafka
>   connect-json
>   0.10.2.0
>   compile
>   
> {code}
> The snippet above is from the generated POM of Kafka Streams as of 0.10.2.0 
> release.
> AFAICT the only reason for that is because the Kafka Streams *examples* 
> showcase some JSON processing, but that’s it.
> First and foremost, we should remove the connect-json dependency, and also 
> figure out a way to set up / structure the examples so we that we can 
> continue showcasing JSON support.  Alternatively, we could consider removing 
> the JSON example (but I don't like that, personally).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12396) Dedicated exception for kstreams when null key received

2021-03-01 Thread Veniamin Kalegin (Jira)
Veniamin Kalegin created KAFKA-12396:


 Summary: Dedicated exception for kstreams when null key received
 Key: KAFKA-12396
 URL: https://issues.apache.org/jira/browse/KAFKA-12396
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 2.6.0
Reporter: Veniamin Kalegin


If kstreams application received null as a key (thanks to QA), kstream app 
gives long and confusing stack trace, it would be nice to have shorter and 
specific exception instead of
{{org.apache.kafka.streams.errors.StreamsException: Exception caught in 
process. taskId=0_0, processor=KSTREAM-SOURCE-00, topic=(hidden), 
partition=0, offset=3722, stacktrace=java.lang.NullPointerException}}
 at 
org.apache.kafka.streams.state.internals.RocksDBStore.get(RocksDBStore.java:286)
 at 
org.apache.kafka.streams.state.internals.RocksDBStore.get(RocksDBStore.java:74)
 at 
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.get(ChangeLoggingKeyValueBytesStore.java:94)
 at 
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.get(ChangeLoggingKeyValueBytesStore.java:29)
 at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$get$2(MeteredKeyValueStore.java:133)
 at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore$$Lambda$1048/0x60630fd0.get(Unknown
 Source)
 at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:851)
 at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:133)
 at 
org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$KeyValueStoreReadWriteDecorator.get(AbstractReadWriteDecorator.java:78)
 at 
org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:64)
 at 
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
 at 
org.apache.kafka.streams.processor.internals.ProcessorNode$$Lambda$1047/0x60630b10.run(Unknown
 Source)
 at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
 at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
 at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
 at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
 at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
 at 
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:96)
 at 
org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:679)
 at 
org.apache.kafka.streams.processor.internals.StreamTask$$Lambda$1046/0x605250f0.run(Unknown
 Source)
 at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
 at 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:679)
 at 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1033)
 at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690)
 at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
 at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
 at 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:696)
 at 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1033)
 at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690)
 at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
 at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
Caused by: java.lang.NullPointerException
 at 
org.apache.kafka.streams.state.internals.RocksDBStore.get(RocksDBStore.java:286)
 at 
org.apache.kafka.streams.state.internals.RocksDBStore.get(RocksDBStore.java:74)
 at 
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.get(ChangeLoggingKeyValueBytesStore.java:94)
 at 
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.get(ChangeLoggingKeyValueBytesStore.java:29)
 at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$get$2(MeteredKeyValueStore.java:133)
 at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore$$Lambda$1048/0x60630fd0.get(Unknown
 Source)
 at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:851)
 at 

[jira] [Comment Edited] (KAFKA-12394) Consider topic id existence and authorization errors

2021-03-01 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17293176#comment-17293176
 ] 

Justine Olshan edited comment on KAFKA-12394 at 3/1/21, 9:20 PM:
-

The KIP states:

return {{UNKNOWN_TOPIC_ID}} error for the topic indicating the topic ID did not 
exist. The check for the topic ID will occur before checking authorization on 
the topic. Thus, topic IDs are not considered sensitive information.


So this would imply that {{TOPIC_AUTHORIZATION_FAILED}} would be acceptable by 
the KIP.  I think there was some discussion on a PR about revealing the 
existence of topics.  

One thing I'm a little confused about is how this is different from returning 
{{UNKNOWN_TOPIC_OR_PARTITION}} on describe authorization failure. I guess the 
administrator is not able to find a topic name in the logs, but besides that, I 
think the scenarios are similar.

 


was (Author: jolshan):
The KIP states:

return {{UNKNOWN_TOPIC_ID }}error for the topic indicating the topic ID did not 
exist. The check for the topic ID will occur before checking authorization on 
the topic. Thus, topic IDs are not considered sensitive information.


So this would imply that {{TOPIC_AUTHORIZATION_FAILED}} would be acceptable by 
the KIP.  I think there was some discussion on a PR about revealing the 
existence of topics.  

One thing I'm a little confused about is how this is different from returning 
{{ UNKNOWN_TOPIC_OR_PARTITION }} on describe authorization failure. I guess the 
administrator is not able to find a topic name in the logs, but besides that, I 
think the scenarios are similar.

 

> Consider topic id existence and authorization errors
> 
>
> Key: KAFKA-12394
> URL: https://issues.apache.org/jira/browse/KAFKA-12394
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Priority: Major
>
> We have historically had logic in the api layer to avoid leaking the 
> existence or non-existence of topics to clients which are not authorized to 
> describe them. The way we have done this is to always authorize the topic 
> name first before checking existence.
> Topic ids make this more difficult because the resource (ie the topic name) 
> has to be derived. This means we have to check existence of the topic first. 
> If the topic does not exist, then our hands are tied and we have to return 
> UNKNOWN_TOPIC_ID. If the topic does exist, then we need to check if the 
> client is authorized to describe it. The question comes then what we should 
> do if the client is not authorized?
> The current behavior is to return UNKNOWN_TOPIC_ID. The downside is that this 
> is misleading and forces the client to retry even though they are doomed to 
> hit the same error. However, the client should generally handle this by 
> requesting Metadata using the topic name that they are interested in, which 
> would give them a chance to see the topic authorization error. Basically the 
> fact that you need describe permission in the first place to discover the 
> topic id makes this an unlikely scenario.
> There is an argument to be made for TOPIC_AUTHORIZATION_FAILED as well. 
> Basically we could take the stance that we do not care about leaking the 
> existence of topic IDs since they do not reveal anything about the underlying 
> topic. Additionally, there is little likelihood of a user discovering a valid 
> UUID by accident or even through brute force. The benefit of this is that 
> users get a clear error for cases where a topic Id may have been discovered 
> through some external means. For example, an administrator finds a topic ID 
> in the logging and attempts to delete it using the new `deleteTopicsWithIds` 
> Admin API.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-12394) Consider topic id existence and authorization errors

2021-03-01 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17293176#comment-17293176
 ] 

Justine Olshan edited comment on KAFKA-12394 at 3/1/21, 9:20 PM:
-

The KIP states:

return {{UNKNOWN_TOPIC_ID}} error for the topic indicating the topic ID did not 
exist. The check for the topic ID will occur before checking authorization on 
the topic. Thus, topic IDs are not considered sensitive information.

So this would imply that {{TOPIC_AUTHORIZATION_FAILED}} would be acceptable by 
the KIP. I think there was some discussion on a PR about revealing the 
existence of topics, but the KIP does state this not being "sensitive".

One thing I'm a little confused about is how this is different from returning 
{{UNKNOWN_TOPIC_OR_PARTITION}} on describe authorization failure. I guess the 
administrator is not able to find a topic name in the logs, but besides that, I 
think the scenarios are similar.

 


was (Author: jolshan):
The KIP states:

return {{UNKNOWN_TOPIC_ID}} error for the topic indicating the topic ID did not 
exist. The check for the topic ID will occur before checking authorization on 
the topic. Thus, topic IDs are not considered sensitive information.


So this would imply that {{TOPIC_AUTHORIZATION_FAILED}} would be acceptable by 
the KIP.  I think there was some discussion on a PR about revealing the 
existence of topics.  

One thing I'm a little confused about is how this is different from returning 
{{UNKNOWN_TOPIC_OR_PARTITION}} on describe authorization failure. I guess the 
administrator is not able to find a topic name in the logs, but besides that, I 
think the scenarios are similar.

 

> Consider topic id existence and authorization errors
> 
>
> Key: KAFKA-12394
> URL: https://issues.apache.org/jira/browse/KAFKA-12394
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Priority: Major
>
> We have historically had logic in the api layer to avoid leaking the 
> existence or non-existence of topics to clients which are not authorized to 
> describe them. The way we have done this is to always authorize the topic 
> name first before checking existence.
> Topic ids make this more difficult because the resource (ie the topic name) 
> has to be derived. This means we have to check existence of the topic first. 
> If the topic does not exist, then our hands are tied and we have to return 
> UNKNOWN_TOPIC_ID. If the topic does exist, then we need to check if the 
> client is authorized to describe it. The question comes then what we should 
> do if the client is not authorized?
> The current behavior is to return UNKNOWN_TOPIC_ID. The downside is that this 
> is misleading and forces the client to retry even though they are doomed to 
> hit the same error. However, the client should generally handle this by 
> requesting Metadata using the topic name that they are interested in, which 
> would give them a chance to see the topic authorization error. Basically the 
> fact that you need describe permission in the first place to discover the 
> topic id makes this an unlikely scenario.
> There is an argument to be made for TOPIC_AUTHORIZATION_FAILED as well. 
> Basically we could take the stance that we do not care about leaking the 
> existence of topic IDs since they do not reveal anything about the underlying 
> topic. Additionally, there is little likelihood of a user discovering a valid 
> UUID by accident or even through brute force. The benefit of this is that 
> users get a clear error for cases where a topic Id may have been discovered 
> through some external means. For example, an administrator finds a topic ID 
> in the logging and attempts to delete it using the new `deleteTopicsWithIds` 
> Admin API.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12394) Consider topic id existence and authorization errors

2021-03-01 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17293176#comment-17293176
 ] 

Justine Olshan commented on KAFKA-12394:


The KIP states:

return {{UNKNOWN_TOPIC_ID }}error for the topic indicating the topic ID did not 
exist. The check for the topic ID will occur before checking authorization on 
the topic. Thus, topic IDs are not considered sensitive information.


So this would imply that {{TOPIC_AUTHORIZATION_FAILED}} would be acceptable by 
the KIP.  I think there was some discussion on a PR about revealing the 
existence of topics.  

One thing I'm a little confused about is how this is different from returning 
{{ UNKNOWN_TOPIC_OR_PARTITION }} on describe authorization failure. I guess the 
administrator is not able to find a topic name in the logs, but besides that, I 
think the scenarios are similar.

 

> Consider topic id existence and authorization errors
> 
>
> Key: KAFKA-12394
> URL: https://issues.apache.org/jira/browse/KAFKA-12394
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Priority: Major
>
> We have historically had logic in the api layer to avoid leaking the 
> existence or non-existence of topics to clients which are not authorized to 
> describe them. The way we have done this is to always authorize the topic 
> name first before checking existence.
> Topic ids make this more difficult because the resource (ie the topic name) 
> has to be derived. This means we have to check existence of the topic first. 
> If the topic does not exist, then our hands are tied and we have to return 
> UNKNOWN_TOPIC_ID. If the topic does exist, then we need to check if the 
> client is authorized to describe it. The question comes then what we should 
> do if the client is not authorized?
> The current behavior is to return UNKNOWN_TOPIC_ID. The downside is that this 
> is misleading and forces the client to retry even though they are doomed to 
> hit the same error. However, the client should generally handle this by 
> requesting Metadata using the topic name that they are interested in, which 
> would give them a chance to see the topic authorization error. Basically the 
> fact that you need describe permission in the first place to discover the 
> topic id makes this an unlikely scenario.
> There is an argument to be made for TOPIC_AUTHORIZATION_FAILED as well. 
> Basically we could take the stance that we do not care about leaking the 
> existence of topic IDs since they do not reveal anything about the underlying 
> topic. Additionally, there is little likelihood of a user discovering a valid 
> UUID by accident or even through brute force. The benefit of this is that 
> users get a clear error for cases where a topic Id may have been discovered 
> through some external means. For example, an administrator finds a topic ID 
> in the logging and attempts to delete it using the new `deleteTopicsWithIds` 
> Admin API.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] MarcoLotz commented on pull request #10042: KAFKA-9527: fix NPE when using time-based argument for Stream Resetter

2021-03-01 Thread GitBox


MarcoLotz commented on pull request #10042:
URL: https://github.com/apache/kafka/pull/10042#issuecomment-788295749


   @mjsax can you please have a quick look on the updated tests?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vanhoale opened a new pull request #10239: KAFKA-12372 - Enhance TimestampCoverter to handle multiple timestamp or date fields

2021-03-01 Thread GitBox


vanhoale opened a new pull request #10239:
URL: https://github.com/apache/kafka/pull/10239


   [JIRA](https://issues.apache.org/jira/browse/KAFKA-12372)
   
   Our team is having an issue of handling multiple timestamp fields in a kafka 
message, so for now if we use the converter then we have to add more fields in 
the config.
   
   We can implement it in a generic way to check if the field.schema().name() 
is timestamp or date then we can convert it.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10223: MINOR: Do not expose topic name in `DeleteTopic` response if no describe permission

2021-03-01 Thread GitBox


hachikuji commented on a change in pull request #10223:
URL: https://github.com/apache/kafka/pull/10223#discussion_r585036724



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1884,20 +1884,24 @@ class KafkaApis(val requestChannel: RequestChannel,
   val authorizedDeleteTopics = 
authHelper.filterByAuthorized(request.context, DELETE, TOPIC,
 results.asScala.filter(result => result.name() != null))(_.name)
   results.forEach { topic =>
-val unresolvedTopicId = !(topic.topicId() == Uuid.ZERO_UUID) && 
topic.name() == null
- if (!config.usesTopicId && 
topicIdsFromRequest.contains(topic.topicId)) {
-   topic.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
-   topic.setErrorMessage("Topic IDs are not supported on the server.")
- } else if (unresolvedTopicId)
- topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
- else if (topicIdsFromRequest.contains(topic.topicId) && 
!authorizedDescribeTopics(topic.name))
-   topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
- else if (!authorizedDeleteTopics.contains(topic.name))
-   topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
- else if (!metadataCache.contains(topic.name))
-   topic.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
- else
-   toDelete += topic.name
+val unresolvedTopicId = topic.topicId() != Uuid.ZERO_UUID && 
topic.name() == null
+if (!config.usesTopicId && 
topicIdsFromRequest.contains(topic.topicId)) {
+  topic.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
+  topic.setErrorMessage("Topic IDs are not supported on the server.")
+} else if (unresolvedTopicId) {
+  topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
+} else if (topicIdsFromRequest.contains(topic.topicId) && 
!authorizedDescribeTopics(topic.name)) {
+  // Because the client does not have Describe permission, the name 
should
+  // not be returned in the response.
+  topic.setName(null)

Review comment:
   We have one dependence here which makes this a little more work than I 
wanted to do here: 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L1909.
 I filed a separate JIRA so that we don't forget about it: 
https://issues.apache.org/jira/browse/KAFKA-12395.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12395) Drop topic mapKey in DeleteTopics response

2021-03-01 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-12395:
---

 Summary: Drop topic mapKey in DeleteTopics response
 Key: KAFKA-12395
 URL: https://issues.apache.org/jira/browse/KAFKA-12395
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson


Now that DeleteTopic requests/responses may be keyed by topicId, the use of the 
the topic name as a map key in the response makes less sense. We should 
consider dropping it. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10383) KTable Join on Foreign key is opinionated

2021-03-01 Thread Marco Lotz (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10383?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17293166#comment-17293166
 ] 

Marco Lotz commented on KAFKA-10383:


[~mjsax] the KIP is available 
[here|https://cwiki.apache.org/confluence/display/KAFKA/KIP-718%3A+Make+KTable+Join+on+Foreign+key+unopinionated].
 I will send the discuss email later today or tomorrow.

> KTable Join on Foreign key is opinionated 
> --
>
> Key: KAFKA-10383
> URL: https://issues.apache.org/jira/browse/KAFKA-10383
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.4.1
>Reporter: Marco Lotz
>Assignee: Marco Lotz
>Priority: Major
>  Labels: needs-kip
>
> *Status Quo:*
>  The current implementation of [KIP-213 
> |[https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable]]
>  of Foreign Key Join between two KTables is _opinionated_ in terms of storage 
> layer.
> Independently of the Materialization method provided in the method argument, 
> it generates an intermediary RocksDB state store. Thus, even when the 
> Materialization method provided is "in memory", it will use RocksDB 
> under-the-hood for this internal state-store.
>  
> *Related problems:*
>  * IT Tests: Having an implicit materialization method for state-store 
> affects tests using foreign key state-stores. [On windows based systems 
> |[https://stackoverflow.com/questions/50602512/failed-to-delete-the-state-directory-in-ide-for-kafka-stream-application]],
>  that are affected by the RocksDB filesystem removal problem, an approach to 
> avoid the bug is to use in-memory state-stores (rather than exception 
> swallowing). Having the intermediate RocksDB storage being created 
> disregarding materialization method forces any IT test to necessarily use the 
> manual FS deletion with exception swallowing hack.
>  * Short lived Streams: Ktables can be short lived in a way that neither 
> persistent storage nor change-logs creation are desired. The current 
> implementation prevents this.
> *Suggestion:*
> One possible solution is to use a similar materialization method (to the one 
> provided in the argument) when creating the intermediary Foreign Key 
> state-store. If the Materialization is in memory and without changelog, the 
> same happens in the intermediate state-sore. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jolshan commented on pull request #10223: MINOR: Do not expose topic name in `DeleteTopic` response if no describe permission

2021-03-01 Thread GitBox


jolshan commented on pull request #10223:
URL: https://github.com/apache/kafka/pull/10223#issuecomment-788251941


   @hachikuji Ah sorry. I was misreading the file name. Makes sense.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12381) Incompatible change in verifiable_producer.log in 2.8

2021-03-01 Thread Boyang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17293161#comment-17293161
 ] 

Boyang Chen commented on KAFKA-12381:
-

[~cmccabe] Could you provide more specific details here, such like which test 
case is failing and where you were fixing the log grep?

> Incompatible change in verifiable_producer.log in 2.8
> -
>
> Key: KAFKA-12381
> URL: https://issues.apache.org/jira/browse/KAFKA-12381
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.0
>Reporter: Colin McCabe
>Assignee: Boyang Chen
>Priority: Blocker
>
> In test_verifiable_producer.py , we used to see this error message in 
> verifiable_producer.log when a topic couldn't be created:
> WARN [Producer clientId=producer-1] Error while fetching metadata with 
> correlation id 1 : {test_topic=LEADER_NOT_AVAILABLE} 
> (org.apache.kafka.clients.NetworkClient)
> The test does a grep LEADER_NOT_AVAILABLE on the log in this case, and it 
> used to pass.
> Now we are instead seeing this in the log file:
> WARN [Producer clientId=producer-1] Error while fetching metadata with 
> correlation id 1 : {test_topic=INVALID_REPLICATION_FACTOR} 
> (org.apache.kafka.clients.NetworkClient)
> And of course now the test fails.
> The INVALID_REPLICATION_FACTOR is coming from the new auto topic creation 
> manager.
> It is a simple matter to make the test pass -- I have confirmed that it 
> passes if we grep for INVALID_REPLICATION_FACTOR in the log file instead of 
> LEADER_NOT_AVAILABLE.
> I think we just need to decide if this change in behavior is acceptable or 
> not.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12381) Incompatible change in verifiable_producer.log in 2.8

2021-03-01 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-12381:

Component/s: core

> Incompatible change in verifiable_producer.log in 2.8
> -
>
> Key: KAFKA-12381
> URL: https://issues.apache.org/jira/browse/KAFKA-12381
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.8.0
>Reporter: Colin McCabe
>Assignee: Boyang Chen
>Priority: Blocker
>
> In test_verifiable_producer.py , we used to see this error message in 
> verifiable_producer.log when a topic couldn't be created:
> WARN [Producer clientId=producer-1] Error while fetching metadata with 
> correlation id 1 : {test_topic=LEADER_NOT_AVAILABLE} 
> (org.apache.kafka.clients.NetworkClient)
> The test does a grep LEADER_NOT_AVAILABLE on the log in this case, and it 
> used to pass.
> Now we are instead seeing this in the log file:
> WARN [Producer clientId=producer-1] Error while fetching metadata with 
> correlation id 1 : {test_topic=INVALID_REPLICATION_FACTOR} 
> (org.apache.kafka.clients.NetworkClient)
> And of course now the test fails.
> The INVALID_REPLICATION_FACTOR is coming from the new auto topic creation 
> manager.
> It is a simple matter to make the test pass -- I have confirmed that it 
> passes if we grep for INVALID_REPLICATION_FACTOR in the log file instead of 
> LEADER_NOT_AVAILABLE.
> I think we just need to decide if this change in behavior is acceptable or 
> not.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12381) Incompatible change in verifiable_producer.log in 2.8

2021-03-01 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-12381:

Labels: kip-500  (was: )

> Incompatible change in verifiable_producer.log in 2.8
> -
>
> Key: KAFKA-12381
> URL: https://issues.apache.org/jira/browse/KAFKA-12381
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.8.0
>Reporter: Colin McCabe
>Assignee: Boyang Chen
>Priority: Blocker
>  Labels: kip-500
>
> In test_verifiable_producer.py , we used to see this error message in 
> verifiable_producer.log when a topic couldn't be created:
> WARN [Producer clientId=producer-1] Error while fetching metadata with 
> correlation id 1 : {test_topic=LEADER_NOT_AVAILABLE} 
> (org.apache.kafka.clients.NetworkClient)
> The test does a grep LEADER_NOT_AVAILABLE on the log in this case, and it 
> used to pass.
> Now we are instead seeing this in the log file:
> WARN [Producer clientId=producer-1] Error while fetching metadata with 
> correlation id 1 : {test_topic=INVALID_REPLICATION_FACTOR} 
> (org.apache.kafka.clients.NetworkClient)
> And of course now the test fails.
> The INVALID_REPLICATION_FACTOR is coming from the new auto topic creation 
> manager.
> It is a simple matter to make the test pass -- I have confirmed that it 
> passes if we grep for INVALID_REPLICATION_FACTOR in the log file instead of 
> LEADER_NOT_AVAILABLE.
> I think we just need to decide if this change in behavior is acceptable or 
> not.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #10223: MINOR: Do not expose topic name in `DeleteTopic` response if no describe permission

2021-03-01 Thread GitBox


hachikuji commented on a change in pull request #10223:
URL: https://github.com/apache/kafka/pull/10223#discussion_r585027051



##
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##
@@ -3479,6 +3481,161 @@ class KafkaApisTest {
 assertEquals(List(mkTopicData(topic = "foo", Seq(1, 2))), 
fooState.topics.asScala.toList)
   }
 
+  @Test
+  def testDeleteTopicsByIdAuthorization(): Unit = {
+val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+val controllerContext: ControllerContext = 
EasyMock.mock(classOf[ControllerContext])
+
+EasyMock.expect(clientControllerQuotaManager.newQuotaFor(
+  EasyMock.anyObject(classOf[RequestChannel.Request]),
+  EasyMock.anyShort()
+)).andReturn(UnboundedControllerMutationQuota)
+EasyMock.expect(controller.isActive).andReturn(true)
+
EasyMock.expect(controller.controllerContext).andStubReturn(controllerContext)
+
+// Try to delete three topics:
+// 1. One without describe permission
+// 2. One without delete permission
+// 3. One which is authorized, but doesn't exist
+
+expectTopicAuthorization(authorizer, AclOperation.DESCRIBE, Map(
+  "foo" -> AuthorizationResult.DENIED,
+  "bar" -> AuthorizationResult.ALLOWED
+))
+
+expectTopicAuthorization(authorizer, AclOperation.DELETE, Map(
+  "foo" -> AuthorizationResult.DENIED,
+  "bar" -> AuthorizationResult.DENIED
+))
+
+val topicIdsMap = Map(
+  Uuid.randomUuid() -> Some("foo"),
+  Uuid.randomUuid() -> Some("bar"),
+  Uuid.randomUuid() -> None
+)
+
+topicIdsMap.foreach { case (topicId, topicNameOpt) =>
+  
EasyMock.expect(controllerContext.topicName(topicId)).andReturn(topicNameOpt)
+}
+
+val topicDatas = topicIdsMap.keys.map { topicId =>
+  new DeleteTopicsRequestData.DeleteTopicState().setTopicId(topicId)
+}.toList
+val deleteRequest = new DeleteTopicsRequest.Builder(new 
DeleteTopicsRequestData()
+  .setTopics(topicDatas.asJava))
+  .build(ApiKeys.DELETE_TOPICS.latestVersion)
+
+val request = buildRequest(deleteRequest)
+val capturedResponse = expectNoThrottling(request)
+
+EasyMock.replay(replicaManager, clientRequestQuotaManager, 
clientControllerQuotaManager,
+  requestChannel, txnCoordinator, controller, controllerContext, 
authorizer)
+createKafkaApis(authorizer = 
Some(authorizer)).handleDeleteTopicsRequest(request)
+
+val deleteResponse = 
capturedResponse.getValue.asInstanceOf[DeleteTopicsResponse]
+
+topicIdsMap.foreach { case (topicId, nameOpt) =>
+  val response = deleteResponse.data.responses.asScala.find(_.topicId == 
topicId).get
+  nameOpt match {
+case Some("foo") =>
+  assertNull(response.name)
+  assertEquals(Errors.UNKNOWN_TOPIC_ID, 
Errors.forCode(response.errorCode))
+case Some("bar") =>
+  assertEquals("bar", response.name)
+  assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED, 
Errors.forCode(response.errorCode))
+case None =>
+  assertNull(response.name)
+  assertEquals(Errors.UNKNOWN_TOPIC_ID, 
Errors.forCode(response.errorCode))
+case _ =>
+  fail("Unexpected topic id/name mapping")
+  }
+}
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testDeleteTopicsByNameAuthorization(usePrimitiveTopicNameArray: 
Boolean): Unit = {
+val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+
+EasyMock.expect(clientControllerQuotaManager.newQuotaFor(
+  EasyMock.anyObject(classOf[RequestChannel.Request]),
+  EasyMock.anyShort()
+)).andReturn(UnboundedControllerMutationQuota)
+EasyMock.expect(controller.isActive).andReturn(true)
+
+// Try to delete three topics:
+// 1. One without describe permission
+// 2. One without delete permission
+// 3. One which is authorized, but doesn't exist
+
+expectTopicAuthorization(authorizer, AclOperation.DESCRIBE, Map(
+  "foo" -> AuthorizationResult.DENIED,
+  "bar" -> AuthorizationResult.ALLOWED,
+  "baz" -> AuthorizationResult.ALLOWED
+))
+
+expectTopicAuthorization(authorizer, AclOperation.DELETE, Map(
+  "foo" -> AuthorizationResult.DENIED,
+  "bar" -> AuthorizationResult.DENIED,
+  "baz" -> AuthorizationResult.ALLOWED
+))
+
+val deleteRequest = if (usePrimitiveTopicNameArray) {
+  new DeleteTopicsRequest.Builder(new DeleteTopicsRequestData()
+.setTopicNames(List("foo", "bar", "baz").asJava))
+.build(5.toShort)
+} else {
+  val topicDatas = List(
+new DeleteTopicsRequestData.DeleteTopicState().setName("foo"),
+new DeleteTopicsRequestData.DeleteTopicState().setName("bar"),
+new DeleteTopicsRequestData.DeleteTopicState().setName("baz")
+  )
+  new DeleteTopicsRequest.Builder(new DeleteTopicsRequestData()
+.setTopics(topicDatas.asJava))
+

[GitHub] [kafka] gharris1727 commented on pull request #8259: KAFKA-7421: Ensure Connect's PluginClassLoader is truly parallel capable and resolve deadlock occurrences

2021-03-01 Thread GitBox


gharris1727 commented on pull request #8259:
URL: https://github.com/apache/kafka/pull/8259#issuecomment-788250316


   Now that the bug is fixed, some comments are explicitly incorrect. For 
example:
   ```
   // 1. Lock the PluginClassLoader (via PluginClassLoader::loadClass)
   // This behavior is specific to the JVM, not the classloader 
implementation
   ```
   And a lot have the wrong intent, or are misleading. Do you think we should 
fix some or all of these, or leave them as a detailed description of how the 
bug _did_ happen when it existed?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12394) Consider topic id existence and authorization errors

2021-03-01 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17293155#comment-17293155
 ] 

Ismael Juma commented on KAFKA-12394:
-

{quote}There is an argument to be made for TOPIC_AUTHORIZATION_FAILED as well.
{quote}
I thought the KIP had specified ^. cc [~jolshan]

> Consider topic id existence and authorization errors
> 
>
> Key: KAFKA-12394
> URL: https://issues.apache.org/jira/browse/KAFKA-12394
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Priority: Major
>
> We have historically had logic in the api layer to avoid leaking the 
> existence or non-existence of topics to clients which are not authorized to 
> describe them. The way we have done this is to always authorize the topic 
> name first before checking existence.
> Topic ids make this more difficult because the resource (ie the topic name) 
> has to be derived. This means we have to check existence of the topic first. 
> If the topic does not exist, then our hands are tied and we have to return 
> UNKNOWN_TOPIC_ID. If the topic does exist, then we need to check if the 
> client is authorized to describe it. The question comes then what we should 
> do if the client is not authorized?
> The current behavior is to return UNKNOWN_TOPIC_ID. The downside is that this 
> is misleading and forces the client to retry even though they are doomed to 
> hit the same error. However, the client should generally handle this by 
> requesting Metadata using the topic name that they are interested in, which 
> would give them a chance to see the topic authorization error. Basically the 
> fact that you need describe permission in the first place to discover the 
> topic id makes this an unlikely scenario.
> There is an argument to be made for TOPIC_AUTHORIZATION_FAILED as well. 
> Basically we could take the stance that we do not care about leaking the 
> existence of topic IDs since they do not reveal anything about the underlying 
> topic. Additionally, there is little likelihood of a user discovering a valid 
> UUID by accident or even through brute force. The benefit of this is that 
> users get a clear error for cases where a topic Id may have been discovered 
> through some external means. For example, an administrator finds a topic ID 
> in the logging and attempts to delete it using the new `deleteTopicsWithIds` 
> Admin API.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] gharris1727 commented on a change in pull request #8259: KAFKA-7421: Ensure Connect's PluginClassLoader is truly parallel capable and resolve deadlock occurrences

2021-03-01 Thread GitBox


gharris1727 commented on a change in pull request #8259:
URL: https://github.com/apache/kafka/pull/8259#discussion_r585020737



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
##
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.isolation;
+
+import static org.junit.Assert.fail;
+
+import java.lang.management.LockInfo;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MonitorInfo;
+import java.lang.management.ThreadInfo;
+import java.net.URL;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SynchronizationTest {
+
+public static final Logger log = 
LoggerFactory.getLogger(SynchronizationTest.class);
+
+@Rule
+public final TestName testName = new TestName();
+
+private String threadPrefix;
+private Plugins plugins;
+private ThreadPoolExecutor exec;
+private Breakpoint dclBreakpoint;
+private Breakpoint pclBreakpoint;
+
+@Before
+public void setup() {
+TestPlugins.assertAvailable();
+Map pluginProps = Collections.singletonMap(
+WorkerConfig.PLUGIN_PATH_CONFIG,
+String.join(",", TestPlugins.pluginPath())
+);
+threadPrefix = SynchronizationTest.class.getSimpleName()
++ "." + testName.getMethodName() + "-";
+dclBreakpoint = new Breakpoint<>();
+pclBreakpoint = new Breakpoint<>();
+plugins = new Plugins(pluginProps) {
+@Override
+protected DelegatingClassLoader 
newDelegatingClassLoader(List paths) {
+return AccessController.doPrivileged(
+(PrivilegedAction) () ->
+new SynchronizedDelegatingClassLoader(paths)
+);
+}
+};
+exec = new ThreadPoolExecutor(
+2,
+2,
+1000L,
+TimeUnit.MILLISECONDS,
+new LinkedBlockingDeque<>(),
+threadFactoryWithNamedThreads(threadPrefix)
+);
+
+}
+
+@After
+public void tearDown() throws InterruptedException {
+dclBreakpoint.clear();
+pclBreakpoint.clear();
+exec.shutdown();
+exec.awaitTermination(1L, TimeUnit.SECONDS);
+}
+
+private static class Breakpoint {
+
+private Predicate predicate;
+private CyclicBarrier barrier;
+
+public synchronized void clear() {
+if (barrier != null) {
+barrier.reset();
+}
+predicate = null;
+barrier = null;
+}
+
+public synchronized void set(Predicate predicate) {
+clear();
+this.predicate = predicate;
+// As soon as the barrier is tripped, the barrier will be reset 
for the next round.
+barrier = new CyclicBarrier(2);
+}
+
+/**
+ * From a thread under test, await for the test orchestrator to 
continue execution
+ * @param obj Object to test with the breakpoint's current predicate
+ */

[GitHub] [kafka] hachikuji edited a comment on pull request #10223: MINOR: Do not expose topic name in `DeleteTopic` response if no describe permission

2021-03-01 Thread GitBox


hachikuji edited a comment on pull request #10223:
URL: https://github.com/apache/kafka/pull/10223#issuecomment-788239224


   @jolshan @chia7712 I don't think `KafkaApis` has been rewritten yet. My 
suggestion is probably get this checked in as is so that we can definitely get 
it into 2.8. Then we should consider how to consolidate the handling logic in 
`ControllerApis` and `KafkaApis` after #10184 is merged.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #10223: MINOR: Do not expose topic name in `DeleteTopic` response if no describe permission

2021-03-01 Thread GitBox


hachikuji commented on pull request #10223:
URL: https://github.com/apache/kafka/pull/10223#issuecomment-788239224


   @jolshan @chia7712 I don't think `KafkaApis` has been rewritten yet. My 
suggestion is probably get this checked in as is so that we can definitely get 
it into 2.8. Then we should consider how to consolidate the handling logic in 
`ControllerApis` and `KafkaApis`. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10223: MINOR: Do not expose topic name in `DeleteTopic` response if no describe permission

2021-03-01 Thread GitBox


hachikuji commented on a change in pull request #10223:
URL: https://github.com/apache/kafka/pull/10223#discussion_r585017852



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1874,7 +1874,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 if (topic.name() != null && topic.topicId() != Uuid.ZERO_UUID)
   throw new InvalidRequestException("Topic name and topic ID can not 
both be specified.")
 val name = if (topic.topicId() == Uuid.ZERO_UUID) topic.name()
-else 
zkSupport.controller.controllerContext.topicNames.getOrElse(topic.topicId(), 
null)
+else 
zkSupport.controller.controllerContext.topicName(topic.topicId).orNull

Review comment:
   I mainly did this for easier mocking. I would like to see reduced access 
for `topicNames`, but that feels like something to address separately.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #10205: KAFKA-12323 Follow-up: Refactor the unit test a bit

2021-03-01 Thread GitBox


guozhangwang commented on pull request #10205:
URL: https://github.com/apache/kafka/pull/10205#issuecomment-788238202


   Cherry-picked to 2.8 and 2.7, cc RM @vvcephei 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang merged pull request #10205: KAFKA-12323 Follow-up: Refactor the unit test a bit

2021-03-01 Thread GitBox


guozhangwang merged pull request #10205:
URL: https://github.com/apache/kafka/pull/10205


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10223: MINOR: Do not expose topic name in `DeleteTopic` response if no describe permission

2021-03-01 Thread GitBox


hachikuji commented on a change in pull request #10223:
URL: https://github.com/apache/kafka/pull/10223#discussion_r585014029



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1884,20 +1884,24 @@ class KafkaApis(val requestChannel: RequestChannel,
   val authorizedDeleteTopics = 
authHelper.filterByAuthorized(request.context, DELETE, TOPIC,
 results.asScala.filter(result => result.name() != null))(_.name)
   results.forEach { topic =>
-val unresolvedTopicId = !(topic.topicId() == Uuid.ZERO_UUID) && 
topic.name() == null
- if (!config.usesTopicId && 
topicIdsFromRequest.contains(topic.topicId)) {
-   topic.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
-   topic.setErrorMessage("Topic IDs are not supported on the server.")
- } else if (unresolvedTopicId)
- topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
- else if (topicIdsFromRequest.contains(topic.topicId) && 
!authorizedDescribeTopics(topic.name))
-   topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
- else if (!authorizedDeleteTopics.contains(topic.name))
-   topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
- else if (!metadataCache.contains(topic.name))
-   topic.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
- else
-   toDelete += topic.name
+val unresolvedTopicId = topic.topicId() != Uuid.ZERO_UUID && 
topic.name() == null
+if (!config.usesTopicId && 
topicIdsFromRequest.contains(topic.topicId)) {
+  topic.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
+  topic.setErrorMessage("Topic IDs are not supported on the server.")
+} else if (unresolvedTopicId) {
+  topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
+} else if (topicIdsFromRequest.contains(topic.topicId) && 
!authorizedDescribeTopics(topic.name)) {
+  // Because the client does not have Describe permission, the name 
should
+  // not be returned in the response.
+  topic.setName(null)
+  topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)

Review comment:
   I made the comment before fully realizing how topic IDs were 
complicating the matter. I filed this JIRA to discuss further: 
https://issues.apache.org/jira/browse/KAFKA-12394. I'd suggest that we keep the 
current behavior in this PR and fix the small issue. It would be good to have 
the tests in any case.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12394) Consider topic id existence and authorization errors

2021-03-01 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-12394:
---

 Summary: Consider topic id existence and authorization errors
 Key: KAFKA-12394
 URL: https://issues.apache.org/jira/browse/KAFKA-12394
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson


We have historically had logic in the api layer to avoid leaking the existence 
or non-existence of topics to clients which are not authorized to describe 
them. The way we have done this is to always authorize the topic name first 
before checking existence.

Topic ids make this more difficult because the resource (ie the topic name) has 
to be derived. This means we have to check existence of the topic first. If the 
topic does not exist, then our hands are tied and we have to return 
UNKNOWN_TOPIC_ID. If the topic does exist, then we need to check if the client 
is authorized to describe it. The question comes then what we should do if the 
client is not authorized?

The current behavior is to return UNKNOWN_TOPIC_ID. The downside is that this 
is misleading and forces the client to retry even though they are doomed to hit 
the same error. However, the client should generally handle this by requesting 
Metadata using the topic name that they are interested in, which would give 
them a chance to see the topic authorization error. Basically the fact that you 
need describe permission in the first place to discover the topic id makes this 
an unlikely scenario.

There is an argument to be made for TOPIC_AUTHORIZATION_FAILED as well. 
Basically we could take the stance that we do not care about leaking the 
existence of topic IDs since they do not reveal anything about the underlying 
topic. Additionally, there is little likelihood of a user discovering a valid 
UUID by accident or even through brute force. The benefit of this is that users 
get a clear error for cases where a topic Id may have been discovered through 
some external means. For example, an administrator finds a topic ID in the 
logging and attempts to delete it using the new `deleteTopicsWithIds` Admin API.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rondagostino commented on pull request #10225: MINOR: fix security_test for ZK case due to error change

2021-03-01 Thread GitBox


rondagostino commented on pull request #10225:
URL: https://github.com/apache/kafka/pull/10225#issuecomment-788226851


   This PR "fixes" the issue by changing the system test to check for the new 
error.  We can close this PR in favor of another one if @abbccdda decides the 
behavior change is incorrect and can be corrected.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on pull request #10225: MINOR: fix security_test for ZK case due to error change

2021-03-01 Thread GitBox


rondagostino commented on pull request #10225:
URL: https://github.com/apache/kafka/pull/10225#issuecomment-788225857


   https://issues.apache.org/jira/browse/KAFKA-12381



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang merged pull request #10231: MINOR: Remove stack trace of the lock exception in a debug log4j

2021-03-01 Thread GitBox


guozhangwang merged pull request #10231:
URL: https://github.com/apache/kafka/pull/10231


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >