[kafka] branch 3.3 updated: KAFKA-14167; Completion exceptions should not be translated directly to error codes (#12518)

2022-08-17 Thread jgus
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
 new 81c4426550 KAFKA-14167; Completion exceptions should not be translated 
directly to error codes (#12518)
81c4426550 is described below

commit 81c442655079355f859dcdb495df015a1a1f7baa
Author: Jason Gustafson 
AuthorDate: Wed Aug 17 18:11:42 2022 -0700

KAFKA-14167; Completion exceptions should not be translated directly to 
error codes (#12518)

There are a few cases in `ControllerApis` where we may see an 
`ApiException` wrapped as a `CompletionException`. This can happen in 
`QuorumController.allocateProducerIds` where the returned future is the result 
of calling `thenApply` on the future passed to the controller. The danger when 
this happens is that the `CompletionException` gets passed to 
`Errors.forException`, which translates it to an `UNKNOWN_SERVER_ERROR`. At a 
minimum, I found that the `AllocateProducerIds` and `Update [...]

Interestingly, `DeleteTopics` is not affected by this bug as I originally 
suspected. This is because we have logic in `ApiError.fromThrowable` to check 
for both `CompletionException` and `ExecutionException` and to pull out the 
underlying cause. This patch duplicates this logic from 
`ApiError.fromThrowable` into `Errors.forException` to be sure that we handle 
all cases where exceptions are converted to error codes.

Reviewers: David Arthur 
---
 .../org/apache/kafka/common/protocol/Errors.java   | 21 -
 .../requests/AllocateProducerIdsResponse.java  |  4 +
 .../org/apache/kafka/common/requests/ApiError.java | 10 +--
 .../apache/kafka/common/requests/ApiErrorTest.java |  6 +-
 .../main/scala/kafka/server/ControllerApis.scala   |  7 +-
 .../test/junit/RaftClusterInvocationContext.java   |  4 +-
 .../server/AllocateProducerIdsRequestTest.scala| 98 ++
 .../unit/kafka/server/ControllerApisTest.scala | 78 -
 8 files changed, 205 insertions(+), 23 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 2ca42bafcf..c220bbcde4 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -132,6 +132,8 @@ import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
 import java.util.function.Function;
 
 /**
@@ -469,7 +471,8 @@ public enum Errors {
  * If there are multiple matches in the class hierarchy, the first match 
starting from the bottom is used.
  */
 public static Errors forException(Throwable t) {
-Class clazz = t.getClass();
+Throwable cause = maybeUnwrapException(t);
+Class clazz = cause.getClass();
 while (clazz != null) {
 Errors error = classToError.get(clazz);
 if (error != null)
@@ -479,6 +482,22 @@ public enum Errors {
 return UNKNOWN_SERVER_ERROR;
 }
 
+/**
+ * Check if a Throwable is a commonly wrapped exception type (e.g. 
`CompletionException`) and return
+ * the cause if so. This is useful to handle cases where exceptions may be 
raised from a future or a
+ * completion stage (as might be the case for requests sent to the 
controller in `ControllerApis`).
+ *
+ * @param t The Throwable to check
+ * @return The throwable itself or its cause if it is an instance of a 
commonly wrapped exception type
+ */
+public static Throwable maybeUnwrapException(Throwable t) {
+if (t instanceof CompletionException || t instanceof 
ExecutionException) {
+return t.getCause();
+} else {
+return t;
+}
+}
+
 private static String toHtml() {
 final StringBuilder b = new StringBuilder();
 b.append("\n");
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/AllocateProducerIdsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/AllocateProducerIdsResponse.java
index 5d48c39e80..41db29158e 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/AllocateProducerIdsResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/AllocateProducerIdsResponse.java
@@ -56,6 +56,10 @@ public class AllocateProducerIdsResponse extends 
AbstractResponse {
 return data.throttleTimeMs();
 }
 
+public Errors error() {
+return Errors.forCode(data.errorCode());
+}
+
 public static AllocateProducerIdsResponse parse(ByteBuffer buffer, short 
version) {
 return new AllocateProducerIdsResponse(new 
AllocateProducerIdsResponseData(
 new 

[kafka] branch trunk updated: KAFKA-14167; Completion exceptions should not be translated directly to error codes (#12518)

2022-08-17 Thread jgus
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new bc90c29faf KAFKA-14167; Completion exceptions should not be translated 
directly to error codes (#12518)
bc90c29faf is described below

commit bc90c29fafc69747daeecada8bb0c347e138edc8
Author: Jason Gustafson 
AuthorDate: Wed Aug 17 18:11:42 2022 -0700

KAFKA-14167; Completion exceptions should not be translated directly to 
error codes (#12518)

There are a few cases in `ControllerApis` where we may see an 
`ApiException` wrapped as a `CompletionException`. This can happen in 
`QuorumController.allocateProducerIds` where the returned future is the result 
of calling `thenApply` on the future passed to the controller. The danger when 
this happens is that the `CompletionException` gets passed to 
`Errors.forException`, which translates it to an `UNKNOWN_SERVER_ERROR`. At a 
minimum, I found that the `AllocateProducerIds` and `Update [...]

Interestingly, `DeleteTopics` is not affected by this bug as I originally 
suspected. This is because we have logic in `ApiError.fromThrowable` to check 
for both `CompletionException` and `ExecutionException` and to pull out the 
underlying cause. This patch duplicates this logic from 
`ApiError.fromThrowable` into `Errors.forException` to be sure that we handle 
all cases where exceptions are converted to error codes.

Reviewers: David Arthur 
---
 .../org/apache/kafka/common/protocol/Errors.java   | 21 -
 .../requests/AllocateProducerIdsResponse.java  |  4 +
 .../org/apache/kafka/common/requests/ApiError.java | 10 +--
 .../apache/kafka/common/requests/ApiErrorTest.java |  6 +-
 .../main/scala/kafka/server/ControllerApis.scala   |  7 +-
 .../test/junit/RaftClusterInvocationContext.java   |  4 +-
 .../server/AllocateProducerIdsRequestTest.scala| 98 ++
 .../unit/kafka/server/ControllerApisTest.scala | 78 -
 8 files changed, 205 insertions(+), 23 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 2ca42bafcf..c220bbcde4 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -132,6 +132,8 @@ import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
 import java.util.function.Function;
 
 /**
@@ -469,7 +471,8 @@ public enum Errors {
  * If there are multiple matches in the class hierarchy, the first match 
starting from the bottom is used.
  */
 public static Errors forException(Throwable t) {
-Class clazz = t.getClass();
+Throwable cause = maybeUnwrapException(t);
+Class clazz = cause.getClass();
 while (clazz != null) {
 Errors error = classToError.get(clazz);
 if (error != null)
@@ -479,6 +482,22 @@ public enum Errors {
 return UNKNOWN_SERVER_ERROR;
 }
 
+/**
+ * Check if a Throwable is a commonly wrapped exception type (e.g. 
`CompletionException`) and return
+ * the cause if so. This is useful to handle cases where exceptions may be 
raised from a future or a
+ * completion stage (as might be the case for requests sent to the 
controller in `ControllerApis`).
+ *
+ * @param t The Throwable to check
+ * @return The throwable itself or its cause if it is an instance of a 
commonly wrapped exception type
+ */
+public static Throwable maybeUnwrapException(Throwable t) {
+if (t instanceof CompletionException || t instanceof 
ExecutionException) {
+return t.getCause();
+} else {
+return t;
+}
+}
+
 private static String toHtml() {
 final StringBuilder b = new StringBuilder();
 b.append("\n");
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/AllocateProducerIdsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/AllocateProducerIdsResponse.java
index 5d48c39e80..41db29158e 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/AllocateProducerIdsResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/AllocateProducerIdsResponse.java
@@ -56,6 +56,10 @@ public class AllocateProducerIdsResponse extends 
AbstractResponse {
 return data.throttleTimeMs();
 }
 
+public Errors error() {
+return Errors.forCode(data.errorCode());
+}
+
 public static AllocateProducerIdsResponse parse(ByteBuffer buffer, short 
version) {
 return new AllocateProducerIdsResponse(new 
AllocateProducerIdsResponseData(
 new 

[kafka] branch 3.3 updated: KAFKA-13940; Return NOT_LEADER_OR_FOLLOWER if DescribeQuorum sent to non-leader (#12517)

2022-08-17 Thread jgus
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
 new 0ac9c3498c KAFKA-13940; Return NOT_LEADER_OR_FOLLOWER if 
DescribeQuorum sent to non-leader (#12517)
0ac9c3498c is described below

commit 0ac9c3498c761167db1ead46d89dea71b29981d6
Author: Jason Gustafson 
AuthorDate: Wed Aug 17 15:48:32 2022 -0700

KAFKA-13940; Return NOT_LEADER_OR_FOLLOWER if DescribeQuorum sent to 
non-leader (#12517)

Currently the server will return `INVALID_REQUEST` if a `DescribeQuorum` 
request is sent to a node that is not the current leader. In addition to being 
inconsistent with all of the other leader APIs in the raft layer, this error is 
treated as fatal by both the forwarding manager and the admin client. Instead, 
we should return `NOT_LEADER_OR_FOLLOWER` as we do with the other APIs. This 
error is retriable and we can rely on the admin client to retry it after seeing 
this error.

Reviewers: David Jacot 
---
 .../common/requests/DescribeQuorumResponse.java| 16 +++-
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 23 +
 .../org/apache/kafka/raft/KafkaRaftClient.java |  5 +++-
 .../org/apache/kafka/raft/KafkaRaftClientTest.java | 29 ++
 .../apache/kafka/raft/RaftClientTestContext.java   | 27 ++--
 5 files changed, 85 insertions(+), 15 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java
index cbf945b704..06ae681bc5 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java
@@ -37,7 +37,7 @@ import java.util.Map;
  * - {@link Errors#BROKER_NOT_AVAILABLE}
  *
  * Partition level errors:
- * - {@link Errors#INVALID_REQUEST}
+ * - {@link Errors#NOT_LEADER_OR_FOLLOWER}
  * - {@link Errors#UNKNOWN_TOPIC_OR_PARTITION}
  */
 public class DescribeQuorumResponse extends AbstractResponse {
@@ -72,6 +72,19 @@ public class DescribeQuorumResponse extends AbstractResponse 
{
 return DEFAULT_THROTTLE_TIME;
 }
 
+public static DescribeQuorumResponseData singletonErrorResponse(
+TopicPartition topicPartition,
+Errors error
+) {
+return new DescribeQuorumResponseData()
+.setTopics(Collections.singletonList(new 
DescribeQuorumResponseData.TopicData()
+.setTopicName(topicPartition.topic())
+.setPartitions(Collections.singletonList(new 
DescribeQuorumResponseData.PartitionData()
+.setPartitionIndex(topicPartition.partition())
+.setErrorCode(error.code());
+}
+
+
 public static DescribeQuorumResponseData singletonResponse(TopicPartition 
topicPartition,
int leaderId,
int leaderEpoch,
@@ -82,6 +95,7 @@ public class DescribeQuorumResponse extends AbstractResponse {
 .setTopics(Collections.singletonList(new 
DescribeQuorumResponseData.TopicData()
 .setTopicName(topicPartition.topic())
 .setPartitions(Collections.singletonList(new 
DescribeQuorumResponseData.PartitionData()
+.setPartitionIndex(topicPartition.partition())
 .setErrorCode(Errors.NONE.code())
 .setLeaderId(leaderId)
 .setLeaderEpoch(leaderEpoch)
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index de57813679..5faf53f075 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -5192,6 +5192,29 @@ public class KafkaAdminClientTest {
 }
 }
 
+@Test
+public void testDescribeMetadataQuorumRetriableError() throws Exception {
+try (final AdminClientUnitTestEnv env = mockClientEnv()) {
+
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(ApiKeys.DESCRIBE_QUORUM.id,
+ApiKeys.DESCRIBE_QUORUM.oldestVersion(),
+ApiKeys.DESCRIBE_QUORUM.latestVersion()));
+
+// First request fails with a NOT_LEADER_OR_FOLLOWER error (which 
is retriable)
+env.kafkaClient().prepareResponse(
+body -> body instanceof DescribeQuorumRequest,
+prepareDescribeQuorumResponse(Errors.NONE, 
Errors.NOT_LEADER_OR_FOLLOWER, false, false, false, false, false));
+
+// The second request succeeds
+  

[kafka] branch trunk updated: KAFKA-13940; Return NOT_LEADER_OR_FOLLOWER if DescribeQuorum sent to non-leader (#12517)

2022-08-17 Thread jgus
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new e5b865d6bf KAFKA-13940; Return NOT_LEADER_OR_FOLLOWER if 
DescribeQuorum sent to non-leader (#12517)
e5b865d6bf is described below

commit e5b865d6bf37495e9949878c8206b9459aa5e1f4
Author: Jason Gustafson 
AuthorDate: Wed Aug 17 15:48:32 2022 -0700

KAFKA-13940; Return NOT_LEADER_OR_FOLLOWER if DescribeQuorum sent to 
non-leader (#12517)

Currently the server will return `INVALID_REQUEST` if a `DescribeQuorum` 
request is sent to a node that is not the current leader. In addition to being 
inconsistent with all of the other leader APIs in the raft layer, this error is 
treated as fatal by both the forwarding manager and the admin client. Instead, 
we should return `NOT_LEADER_OR_FOLLOWER` as we do with the other APIs. This 
error is retriable and we can rely on the admin client to retry it after seeing 
this error.

Reviewers: David Jacot 
---
 .../common/requests/DescribeQuorumResponse.java| 16 +++-
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 23 +
 .../org/apache/kafka/raft/KafkaRaftClient.java |  5 +++-
 .../org/apache/kafka/raft/KafkaRaftClientTest.java | 29 ++
 .../apache/kafka/raft/RaftClientTestContext.java   | 27 ++--
 5 files changed, 85 insertions(+), 15 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java
index cbf945b704..06ae681bc5 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java
@@ -37,7 +37,7 @@ import java.util.Map;
  * - {@link Errors#BROKER_NOT_AVAILABLE}
  *
  * Partition level errors:
- * - {@link Errors#INVALID_REQUEST}
+ * - {@link Errors#NOT_LEADER_OR_FOLLOWER}
  * - {@link Errors#UNKNOWN_TOPIC_OR_PARTITION}
  */
 public class DescribeQuorumResponse extends AbstractResponse {
@@ -72,6 +72,19 @@ public class DescribeQuorumResponse extends AbstractResponse 
{
 return DEFAULT_THROTTLE_TIME;
 }
 
+public static DescribeQuorumResponseData singletonErrorResponse(
+TopicPartition topicPartition,
+Errors error
+) {
+return new DescribeQuorumResponseData()
+.setTopics(Collections.singletonList(new 
DescribeQuorumResponseData.TopicData()
+.setTopicName(topicPartition.topic())
+.setPartitions(Collections.singletonList(new 
DescribeQuorumResponseData.PartitionData()
+.setPartitionIndex(topicPartition.partition())
+.setErrorCode(error.code());
+}
+
+
 public static DescribeQuorumResponseData singletonResponse(TopicPartition 
topicPartition,
int leaderId,
int leaderEpoch,
@@ -82,6 +95,7 @@ public class DescribeQuorumResponse extends AbstractResponse {
 .setTopics(Collections.singletonList(new 
DescribeQuorumResponseData.TopicData()
 .setTopicName(topicPartition.topic())
 .setPartitions(Collections.singletonList(new 
DescribeQuorumResponseData.PartitionData()
+.setPartitionIndex(topicPartition.partition())
 .setErrorCode(Errors.NONE.code())
 .setLeaderId(leaderId)
 .setLeaderEpoch(leaderEpoch)
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index de57813679..5faf53f075 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -5192,6 +5192,29 @@ public class KafkaAdminClientTest {
 }
 }
 
+@Test
+public void testDescribeMetadataQuorumRetriableError() throws Exception {
+try (final AdminClientUnitTestEnv env = mockClientEnv()) {
+
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(ApiKeys.DESCRIBE_QUORUM.id,
+ApiKeys.DESCRIBE_QUORUM.oldestVersion(),
+ApiKeys.DESCRIBE_QUORUM.latestVersion()));
+
+// First request fails with a NOT_LEADER_OR_FOLLOWER error (which 
is retriable)
+env.kafkaClient().prepareResponse(
+body -> body instanceof DescribeQuorumRequest,
+prepareDescribeQuorumResponse(Errors.NONE, 
Errors.NOT_LEADER_OR_FOLLOWER, false, false, false, false, false));
+
+// The second request succeeds
+  

[kafka] branch trunk updated: HOTFIX: Revert KAFKA-10199 which is causing compilation failures (#12532)

2022-08-17 Thread jgus
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 0243bb98a7 HOTFIX: Revert KAFKA-10199 which is causing compilation 
failures (#12532)
0243bb98a7 is described below

commit 0243bb98a7cc6bc4ed3c8f39b3a1ed9f70d0f8bd
Author: Jason Gustafson 
AuthorDate: Wed Aug 17 14:29:49 2022 -0700

HOTFIX: Revert KAFKA-10199 which is causing compilation failures (#12532)

Compilation is failing after these two commits:
```
> Task :streams:compileJava

/Users/jgustafson/Projects/kafka/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:852:
 error: cannot find symbol
tasks.addPendingTaskToClose(restoringTask.id());
 ^
  symbol:   method 
addPendingTaskToClose(org.apache.kafka.streams.processor.TaskId)
  location: variable tasks of type 
org.apache.kafka.streams.processor.internals.Tasks
1 error
```

Also here:
```

[2022-08-17T20:58:20.912Z] > Task :streams:compileTestJava

[2022-08-17T20:58:20.912Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-12530/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:822:
 error: method setupForRevocation(Set,Set) is already defined in 
class TaskManagerTest

[2022-08-17T20:58:20.912Z] private TaskManager setupForRevocation(final 
Set tasksInStateUpdater,
```
 This patch reverts them.

Reviewers: Ismael Juma 
---
 .../streams/processor/internals/TaskManager.java   |  42 +--
 .../kafka/streams/processor/internals/Tasks.java   |  19 +--
 .../processor/internals/TaskManagerTest.java   | 129 +
 3 files changed, 12 insertions(+), 178 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index bab05a5184..03c36b0daf 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -465,7 +465,7 @@ public class TaskManager {
 standbyTasksToCreate.remove(taskId);
 } else {
 stateUpdater.remove(taskId);
-tasks.addPendingTaskToCloseClean(taskId);
+tasks.addPendingTaskToClose(taskId);
 }
 }
 }
@@ -692,7 +692,7 @@ public class TaskManager {
 
 taskExceptions.putIfAbsent(taskId, e);
 }
-} else if (tasks.removePendingTaskToCloseClean(task.id())) {
+} else if (tasks.removePendingTaskToClose(task.id())) {
 try {
 task.suspend();
 task.closeClean();
@@ -710,8 +710,6 @@ public class TaskManager {
 
 taskExceptions.putIfAbsent(task.id(), e);
 }
-} else if (tasks.removePendingTaskToCloseDirty(task.id())) {
-tasksToCloseDirty.add(task);
 } else if ((inputPartitions = 
tasks.removePendingTaskToUpdateInputPartitions(task.id())) != null) {
 task.updateInputPartitions(inputPartitions, 
topologyMetadata.nodeToSourceTopics(task.id()));
 stateUpdater.add(task);
@@ -757,8 +755,6 @@ public class TaskManager {
 }
 }
 
-removeRevokedTasksFromStateUpdater(remainingRevokedPartitions);
-
 if (!remainingRevokedPartitions.isEmpty()) {
 log.debug("The following revoked partitions {} are missing from 
the current task partitions. It could "
   + "potentially be due to race condition of consumer 
detecting the heartbeat failure, or the tasks " +
@@ -844,20 +840,6 @@ public class TaskManager {
 }
 }
 
-private void removeRevokedTasksFromStateUpdater(final Set 
remainingRevokedPartitions) {
-if (stateUpdater != null) {
-for (final Task restoringTask : stateUpdater.getTasks()) {
-if (restoringTask.isActive()) {
-if 
(remainingRevokedPartitions.containsAll(restoringTask.inputPartitions())) {
-tasks.addPendingTaskToClose(restoringTask.id());
-stateUpdater.remove(restoringTask.id());
-
remainingRevokedPartitions.removeAll(restoringTask.inputPartitions());
-}
-}
-}
-}
-}
-
 private void prepareCommitAndAddOffsetsToMap(final Set 
tasksToPrepare,
  final Map> consumedOffsetsPerTask) {
 for (final Task task : tasksToPrepare) {
@@ -885,15 +867,6 @@ public class TaskManager 

[kafka] branch trunk updated: KAFKA-10199: Remove tasks from state updater on revocation (#12520)

2022-08-17 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new b47c4d8598 KAFKA-10199: Remove tasks from state updater on revocation 
(#12520)
b47c4d8598 is described below

commit b47c4d859805068de6a8fe8de3bda5e7a21132e2
Author: Bruno Cadonna 
AuthorDate: Wed Aug 17 20:13:34 2022 +0200

KAFKA-10199: Remove tasks from state updater on revocation (#12520)

Removes tasks from the state updater when the input partitions of the tasks 
are revoked during a rebalance.

Reviewers: Guozhang Wang 
---
 .../streams/processor/internals/TaskManager.java   | 16 +
 .../processor/internals/TaskManagerTest.java   | 81 ++
 2 files changed, 97 insertions(+)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 4bba28a3f3..bab05a5184 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -757,6 +757,8 @@ public class TaskManager {
 }
 }
 
+removeRevokedTasksFromStateUpdater(remainingRevokedPartitions);
+
 if (!remainingRevokedPartitions.isEmpty()) {
 log.debug("The following revoked partitions {} are missing from 
the current task partitions. It could "
   + "potentially be due to race condition of consumer 
detecting the heartbeat failure, or the tasks " +
@@ -842,6 +844,20 @@ public class TaskManager {
 }
 }
 
+private void removeRevokedTasksFromStateUpdater(final Set 
remainingRevokedPartitions) {
+if (stateUpdater != null) {
+for (final Task restoringTask : stateUpdater.getTasks()) {
+if (restoringTask.isActive()) {
+if 
(remainingRevokedPartitions.containsAll(restoringTask.inputPartitions())) {
+tasks.addPendingTaskToClose(restoringTask.id());
+stateUpdater.remove(restoringTask.id());
+
remainingRevokedPartitions.removeAll(restoringTask.inputPartitions());
+}
+}
+}
+}
+}
+
 private void prepareCommitAndAddOffsetsToMap(final Set 
tasksToPrepare,
  final Map> consumedOffsetsPerTask) {
 for (final Task task : tasksToPrepare) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 133541bfac..ff52ad5ae9 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -751,6 +751,87 @@ public class TaskManagerTest {
 assertThat(taskManager.getTaskOffsetSums(), is(expectedOffsetSums));
 }
 
+@Test
+public void 
shouldRemoveStatefulTaskWithRevokedInputPartitionsFromStateUpdaterOnRevocation()
 {
+final StreamTask task = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+.inState(State.RESTORING)
+.withInputPartitions(taskId00Partitions).build();
+final TaskManager taskManager = setupForRevocation(mkSet(task), 
mkSet(task));
+
+taskManager.handleRevocation(taskId00Partitions);
+
+Mockito.verify(stateUpdater).remove(task.id());
+
+taskManager.tryToCompleteRestoration(time.milliseconds(), null);
+
+Mockito.verify(task).closeClean();
+}
+
+public void 
shouldRemoveMultipleStatefulTaskWithRevokedInputPartitionsFromStateUpdaterOnRevocation()
 {
+final StreamTask task1 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+.inState(State.RESTORING)
+.withInputPartitions(taskId00Partitions).build();
+final StreamTask task2 = statefulTask(taskId01, 
taskId01ChangelogPartitions)
+.inState(State.RESTORING)
+.withInputPartitions(taskId01Partitions).build();
+final TaskManager taskManager = setupForRevocation(mkSet(task1, 
task2), mkSet(task1, task2));
+
+taskManager.handleRevocation(union(HashSet::new, taskId00Partitions, 
taskId01Partitions));
+
+Mockito.verify(stateUpdater).remove(task1.id());
+Mockito.verify(stateUpdater).remove(task2.id());
+
+taskManager.tryToCompleteRestoration(time.milliseconds(), null);
+
+Mockito.verify(task1).closeClean();
+Mockito.verify(task2).closeClean();
+}
+
+@Test
+public void 
shouldNotRemoveStatefulTaskWithoutRevokedInputPartitionsFromStateUpdaterOnRevocation()
 {
+ 

[kafka] branch trunk updated: KAFKA-10199: Remove tasks from state updater on partition lost (#12521)

2022-08-17 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 9f20f89953 KAFKA-10199: Remove tasks from state updater on partition 
lost (#12521)
9f20f89953 is described below

commit 9f20f8995399d9e03f518f7b9c8be2bffb2fdcfc
Author: Bruno Cadonna 
AuthorDate: Wed Aug 17 20:12:30 2022 +0200

KAFKA-10199: Remove tasks from state updater on partition lost (#12521)

Removes tasks from the state updater when the input partitions of the tasks 
are lost during a rebalance.

Reviewers: Guozhang Wang 
---
 .../streams/processor/internals/TaskManager.java   | 26 ++--
 .../kafka/streams/processor/internals/Tasks.java   | 19 ++---
 .../processor/internals/TaskManagerTest.java   | 48 --
 3 files changed, 81 insertions(+), 12 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 03c36b0daf..4bba28a3f3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -465,7 +465,7 @@ public class TaskManager {
 standbyTasksToCreate.remove(taskId);
 } else {
 stateUpdater.remove(taskId);
-tasks.addPendingTaskToClose(taskId);
+tasks.addPendingTaskToCloseClean(taskId);
 }
 }
 }
@@ -692,7 +692,7 @@ public class TaskManager {
 
 taskExceptions.putIfAbsent(taskId, e);
 }
-} else if (tasks.removePendingTaskToClose(task.id())) {
+} else if (tasks.removePendingTaskToCloseClean(task.id())) {
 try {
 task.suspend();
 task.closeClean();
@@ -710,6 +710,8 @@ public class TaskManager {
 
 taskExceptions.putIfAbsent(task.id(), e);
 }
+} else if (tasks.removePendingTaskToCloseDirty(task.id())) {
+tasksToCloseDirty.add(task);
 } else if ((inputPartitions = 
tasks.removePendingTaskToUpdateInputPartitions(task.id())) != null) {
 task.updateInputPartitions(inputPartitions, 
topologyMetadata.nodeToSourceTopics(task.id()));
 stateUpdater.add(task);
@@ -867,6 +869,15 @@ public class TaskManager {
 void handleLostAll() {
 log.debug("Closing lost active tasks as zombies.");
 
+closeRunningTasksDirty();
+removeLostTasksFromStateUpdater();
+
+if (processingMode == EXACTLY_ONCE_V2) {
+activeTaskCreator.reInitializeThreadProducer();
+}
+}
+
+private void closeRunningTasksDirty() {
 final Set allTask = tasks.allTasks();
 for (final Task task : allTask) {
 // Even though we've apparently dropped out of the group, we can 
continue safely to maintain our
@@ -875,9 +886,16 @@ public class TaskManager {
 closeTaskDirty(task);
 }
 }
+}
 
-if (processingMode == EXACTLY_ONCE_V2) {
-activeTaskCreator.reInitializeThreadProducer();
+private void removeLostTasksFromStateUpdater() {
+if (stateUpdater != null) {
+for (final Task restoringTask : stateUpdater.getTasks()) {
+if (restoringTask.isActive()) {
+tasks.addPendingTaskToCloseDirty(restoringTask.id());
+stateUpdater.remove(restoringTask.id());
+}
+}
 }
 }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
index 9628b42d92..8178fe3691 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
@@ -55,7 +55,9 @@ class Tasks {
 private final Map> pendingTasksToRecycle = new 
HashMap<>();
 private final Map> 
pendingTasksToUpdateInputPartitions = new HashMap<>();
 private final Set pendingTasksToInit = new HashSet<>();
-private final Set pendingTasksToClose = new HashSet<>();
+private final Set pendingTasksToCloseClean = new HashSet<>();
+
+private final Set pendingTasksToCloseDirty = new HashSet<>();
 
 // TODO: convert to Stream/StandbyTask when we remove 
TaskManager#StateMachineTask with mocks
 private final Map activeTasksPerPartition = new 
HashMap<>();
@@ -111,12 +113,19 @@ class Tasks {
 pendingTasksToUpdateInputPartitions.put(taskId, inputPartitions);
 }
 
-boolean removePendingTaskToClose(final TaskId taskId) {
-

[kafka] branch trunk updated: MINOR: Fix invalid link to plugin.path property docs in quickstart (#12523)

2022-08-17 Thread cegerton
This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new ae3f48b699 MINOR: Fix invalid link to plugin.path property docs in 
quickstart (#12523)
ae3f48b699 is described below

commit ae3f48b699f8e101dc9bf53aaa8e341c581f37af
Author: Janik Dotzel 
AuthorDate: Wed Aug 17 19:25:35 2022 +0200

MINOR: Fix invalid link to plugin.path property docs in quickstart (#12523)

Reviewers: Chris Egerton 
---
 docs/quickstart.html | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/docs/quickstart.html b/docs/quickstart.html
index 3a75211d0b..7867cb0b10 100644
--- a/docs/quickstart.html
+++ b/docs/quickstart.html
@@ -175,7 +175,7 @@ This is my second event
 
 First, make sure to add connect-file-{{fullDotVersion}}.jar to the 
plugin.path property in the Connect worker's configuration.
 For the purpose of this quickstart we'll use a relative path and 
consider the connectors' package as an uber jar, which works when the 
quickstart commands are run from the installation directory.
-However, it's worth noting that for production deployments using 
absolute paths is always preferable. See plugin.path for a detailed description 
of how to set this config.
+However, it's worth noting that for production deployments using 
absolute paths is always preferable. See plugin.path for a 
detailed description of how to set this config.
 
 
 



[kafka] branch trunk updated: KAFKA-13971:Atomicity violations caused by improper usage of ConcurrentHashMap (#12277)

2022-08-17 Thread cegerton
This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new d8e93c368d KAFKA-13971:Atomicity violations caused by improper usage 
of ConcurrentHashMap (#12277)
d8e93c368d is described below

commit d8e93c368db2862502c8180e4d536edb20ac4d72
Author: Kvicii <42023367+kvi...@users.noreply.github.com>
AuthorDate: Thu Aug 18 00:32:24 2022 +0800

KAFKA-13971:Atomicity violations caused by improper usage of 
ConcurrentHashMap (#12277)

Reviewers: Divij Vaidya , Chris Egerton 

---
 .../connect/runtime/WorkerConfigTransformer.java   | 25 +-
 1 file changed, 10 insertions(+), 15 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
index 4d9c4c1f74..989de85e0c 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
@@ -78,25 +78,20 @@ public class WorkerConfigTransformer implements 
AutoCloseable {
 }
 
 private void scheduleReload(String connectorName, String path, long ttl) {
-Map connectorRequests = 
requests.get(connectorName);
-if (connectorRequests == null) {
-connectorRequests = new ConcurrentHashMap<>();
-requests.put(connectorName, connectorRequests);
-} else {
-HerderRequest previousRequest = connectorRequests.get(path);
+Map connectorRequests = 
requests.computeIfAbsent(connectorName, s -> new ConcurrentHashMap<>());
+connectorRequests.compute(path, (s, previousRequest) -> {
 if (previousRequest != null) {
 // Delete previous request for ttl which is now stale
 previousRequest.cancel();
 }
-}
-log.info("Scheduling a restart of connector {} in {} ms", 
connectorName, ttl);
-Callback cb = (error, result) -> {
-if (error != null) {
-log.error("Unexpected error during connector restart: ", 
error);
-}
-};
-HerderRequest request = worker.herder().restartConnector(ttl, 
connectorName, cb);
-connectorRequests.put(path, request);
+log.info("Scheduling a restart of connector {} in {} ms", 
connectorName, ttl);
+Callback cb = (error, result) -> {
+if (error != null) {
+log.error("Unexpected error during connector restart: ", 
error);
+}
+};
+return worker.herder().restartConnector(ttl, connectorName, cb);
+});
 }
 
 @Override