[kafka] branch 3.3 updated: KAFKA-14167; Completion exceptions should not be translated directly to error codes (#12518)
This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch 3.3 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.3 by this push: new 81c4426550 KAFKA-14167; Completion exceptions should not be translated directly to error codes (#12518) 81c4426550 is described below commit 81c442655079355f859dcdb495df015a1a1f7baa Author: Jason Gustafson AuthorDate: Wed Aug 17 18:11:42 2022 -0700 KAFKA-14167; Completion exceptions should not be translated directly to error codes (#12518) There are a few cases in `ControllerApis` where we may see an `ApiException` wrapped as a `CompletionException`. This can happen in `QuorumController.allocateProducerIds` where the returned future is the result of calling `thenApply` on the future passed to the controller. The danger when this happens is that the `CompletionException` gets passed to `Errors.forException`, which translates it to an `UNKNOWN_SERVER_ERROR`. At a minimum, I found that the `AllocateProducerIds` and `Update [...] Interestingly, `DeleteTopics` is not affected by this bug as I originally suspected. This is because we have logic in `ApiError.fromThrowable` to check for both `CompletionException` and `ExecutionException` and to pull out the underlying cause. This patch duplicates this logic from `ApiError.fromThrowable` into `Errors.forException` to be sure that we handle all cases where exceptions are converted to error codes. Reviewers: David Arthur --- .../org/apache/kafka/common/protocol/Errors.java | 21 - .../requests/AllocateProducerIdsResponse.java | 4 + .../org/apache/kafka/common/requests/ApiError.java | 10 +-- .../apache/kafka/common/requests/ApiErrorTest.java | 6 +- .../main/scala/kafka/server/ControllerApis.scala | 7 +- .../test/junit/RaftClusterInvocationContext.java | 4 +- .../server/AllocateProducerIdsRequestTest.scala| 98 ++ .../unit/kafka/server/ControllerApisTest.scala | 78 - 8 files changed, 205 insertions(+), 23 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index 2ca42bafcf..c220bbcde4 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -132,6 +132,8 @@ import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; import java.util.function.Function; /** @@ -469,7 +471,8 @@ public enum Errors { * If there are multiple matches in the class hierarchy, the first match starting from the bottom is used. */ public static Errors forException(Throwable t) { -Class clazz = t.getClass(); +Throwable cause = maybeUnwrapException(t); +Class clazz = cause.getClass(); while (clazz != null) { Errors error = classToError.get(clazz); if (error != null) @@ -479,6 +482,22 @@ public enum Errors { return UNKNOWN_SERVER_ERROR; } +/** + * Check if a Throwable is a commonly wrapped exception type (e.g. `CompletionException`) and return + * the cause if so. This is useful to handle cases where exceptions may be raised from a future or a + * completion stage (as might be the case for requests sent to the controller in `ControllerApis`). + * + * @param t The Throwable to check + * @return The throwable itself or its cause if it is an instance of a commonly wrapped exception type + */ +public static Throwable maybeUnwrapException(Throwable t) { +if (t instanceof CompletionException || t instanceof ExecutionException) { +return t.getCause(); +} else { +return t; +} +} + private static String toHtml() { final StringBuilder b = new StringBuilder(); b.append("\n"); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AllocateProducerIdsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AllocateProducerIdsResponse.java index 5d48c39e80..41db29158e 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AllocateProducerIdsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AllocateProducerIdsResponse.java @@ -56,6 +56,10 @@ public class AllocateProducerIdsResponse extends AbstractResponse { return data.throttleTimeMs(); } +public Errors error() { +return Errors.forCode(data.errorCode()); +} + public static AllocateProducerIdsResponse parse(ByteBuffer buffer, short version) { return new AllocateProducerIdsResponse(new AllocateProducerIdsResponseData( new
[kafka] branch trunk updated: KAFKA-14167; Completion exceptions should not be translated directly to error codes (#12518)
This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new bc90c29faf KAFKA-14167; Completion exceptions should not be translated directly to error codes (#12518) bc90c29faf is described below commit bc90c29fafc69747daeecada8bb0c347e138edc8 Author: Jason Gustafson AuthorDate: Wed Aug 17 18:11:42 2022 -0700 KAFKA-14167; Completion exceptions should not be translated directly to error codes (#12518) There are a few cases in `ControllerApis` where we may see an `ApiException` wrapped as a `CompletionException`. This can happen in `QuorumController.allocateProducerIds` where the returned future is the result of calling `thenApply` on the future passed to the controller. The danger when this happens is that the `CompletionException` gets passed to `Errors.forException`, which translates it to an `UNKNOWN_SERVER_ERROR`. At a minimum, I found that the `AllocateProducerIds` and `Update [...] Interestingly, `DeleteTopics` is not affected by this bug as I originally suspected. This is because we have logic in `ApiError.fromThrowable` to check for both `CompletionException` and `ExecutionException` and to pull out the underlying cause. This patch duplicates this logic from `ApiError.fromThrowable` into `Errors.forException` to be sure that we handle all cases where exceptions are converted to error codes. Reviewers: David Arthur --- .../org/apache/kafka/common/protocol/Errors.java | 21 - .../requests/AllocateProducerIdsResponse.java | 4 + .../org/apache/kafka/common/requests/ApiError.java | 10 +-- .../apache/kafka/common/requests/ApiErrorTest.java | 6 +- .../main/scala/kafka/server/ControllerApis.scala | 7 +- .../test/junit/RaftClusterInvocationContext.java | 4 +- .../server/AllocateProducerIdsRequestTest.scala| 98 ++ .../unit/kafka/server/ControllerApisTest.scala | 78 - 8 files changed, 205 insertions(+), 23 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index 2ca42bafcf..c220bbcde4 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -132,6 +132,8 @@ import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; import java.util.function.Function; /** @@ -469,7 +471,8 @@ public enum Errors { * If there are multiple matches in the class hierarchy, the first match starting from the bottom is used. */ public static Errors forException(Throwable t) { -Class clazz = t.getClass(); +Throwable cause = maybeUnwrapException(t); +Class clazz = cause.getClass(); while (clazz != null) { Errors error = classToError.get(clazz); if (error != null) @@ -479,6 +482,22 @@ public enum Errors { return UNKNOWN_SERVER_ERROR; } +/** + * Check if a Throwable is a commonly wrapped exception type (e.g. `CompletionException`) and return + * the cause if so. This is useful to handle cases where exceptions may be raised from a future or a + * completion stage (as might be the case for requests sent to the controller in `ControllerApis`). + * + * @param t The Throwable to check + * @return The throwable itself or its cause if it is an instance of a commonly wrapped exception type + */ +public static Throwable maybeUnwrapException(Throwable t) { +if (t instanceof CompletionException || t instanceof ExecutionException) { +return t.getCause(); +} else { +return t; +} +} + private static String toHtml() { final StringBuilder b = new StringBuilder(); b.append("\n"); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AllocateProducerIdsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AllocateProducerIdsResponse.java index 5d48c39e80..41db29158e 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AllocateProducerIdsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AllocateProducerIdsResponse.java @@ -56,6 +56,10 @@ public class AllocateProducerIdsResponse extends AbstractResponse { return data.throttleTimeMs(); } +public Errors error() { +return Errors.forCode(data.errorCode()); +} + public static AllocateProducerIdsResponse parse(ByteBuffer buffer, short version) { return new AllocateProducerIdsResponse(new AllocateProducerIdsResponseData( new
[kafka] branch 3.3 updated: KAFKA-13940; Return NOT_LEADER_OR_FOLLOWER if DescribeQuorum sent to non-leader (#12517)
This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch 3.3 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.3 by this push: new 0ac9c3498c KAFKA-13940; Return NOT_LEADER_OR_FOLLOWER if DescribeQuorum sent to non-leader (#12517) 0ac9c3498c is described below commit 0ac9c3498c761167db1ead46d89dea71b29981d6 Author: Jason Gustafson AuthorDate: Wed Aug 17 15:48:32 2022 -0700 KAFKA-13940; Return NOT_LEADER_OR_FOLLOWER if DescribeQuorum sent to non-leader (#12517) Currently the server will return `INVALID_REQUEST` if a `DescribeQuorum` request is sent to a node that is not the current leader. In addition to being inconsistent with all of the other leader APIs in the raft layer, this error is treated as fatal by both the forwarding manager and the admin client. Instead, we should return `NOT_LEADER_OR_FOLLOWER` as we do with the other APIs. This error is retriable and we can rely on the admin client to retry it after seeing this error. Reviewers: David Jacot --- .../common/requests/DescribeQuorumResponse.java| 16 +++- .../kafka/clients/admin/KafkaAdminClientTest.java | 23 + .../org/apache/kafka/raft/KafkaRaftClient.java | 5 +++- .../org/apache/kafka/raft/KafkaRaftClientTest.java | 29 ++ .../apache/kafka/raft/RaftClientTestContext.java | 27 ++-- 5 files changed, 85 insertions(+), 15 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java index cbf945b704..06ae681bc5 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java @@ -37,7 +37,7 @@ import java.util.Map; * - {@link Errors#BROKER_NOT_AVAILABLE} * * Partition level errors: - * - {@link Errors#INVALID_REQUEST} + * - {@link Errors#NOT_LEADER_OR_FOLLOWER} * - {@link Errors#UNKNOWN_TOPIC_OR_PARTITION} */ public class DescribeQuorumResponse extends AbstractResponse { @@ -72,6 +72,19 @@ public class DescribeQuorumResponse extends AbstractResponse { return DEFAULT_THROTTLE_TIME; } +public static DescribeQuorumResponseData singletonErrorResponse( +TopicPartition topicPartition, +Errors error +) { +return new DescribeQuorumResponseData() +.setTopics(Collections.singletonList(new DescribeQuorumResponseData.TopicData() +.setTopicName(topicPartition.topic()) +.setPartitions(Collections.singletonList(new DescribeQuorumResponseData.PartitionData() +.setPartitionIndex(topicPartition.partition()) +.setErrorCode(error.code()); +} + + public static DescribeQuorumResponseData singletonResponse(TopicPartition topicPartition, int leaderId, int leaderEpoch, @@ -82,6 +95,7 @@ public class DescribeQuorumResponse extends AbstractResponse { .setTopics(Collections.singletonList(new DescribeQuorumResponseData.TopicData() .setTopicName(topicPartition.topic()) .setPartitions(Collections.singletonList(new DescribeQuorumResponseData.PartitionData() +.setPartitionIndex(topicPartition.partition()) .setErrorCode(Errors.NONE.code()) .setLeaderId(leaderId) .setLeaderEpoch(leaderEpoch) diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index de57813679..5faf53f075 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -5192,6 +5192,29 @@ public class KafkaAdminClientTest { } } +@Test +public void testDescribeMetadataQuorumRetriableError() throws Exception { +try (final AdminClientUnitTestEnv env = mockClientEnv()) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(ApiKeys.DESCRIBE_QUORUM.id, +ApiKeys.DESCRIBE_QUORUM.oldestVersion(), +ApiKeys.DESCRIBE_QUORUM.latestVersion())); + +// First request fails with a NOT_LEADER_OR_FOLLOWER error (which is retriable) +env.kafkaClient().prepareResponse( +body -> body instanceof DescribeQuorumRequest, +prepareDescribeQuorumResponse(Errors.NONE, Errors.NOT_LEADER_OR_FOLLOWER, false, false, false, false, false)); + +// The second request succeeds +
[kafka] branch trunk updated: KAFKA-13940; Return NOT_LEADER_OR_FOLLOWER if DescribeQuorum sent to non-leader (#12517)
This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new e5b865d6bf KAFKA-13940; Return NOT_LEADER_OR_FOLLOWER if DescribeQuorum sent to non-leader (#12517) e5b865d6bf is described below commit e5b865d6bf37495e9949878c8206b9459aa5e1f4 Author: Jason Gustafson AuthorDate: Wed Aug 17 15:48:32 2022 -0700 KAFKA-13940; Return NOT_LEADER_OR_FOLLOWER if DescribeQuorum sent to non-leader (#12517) Currently the server will return `INVALID_REQUEST` if a `DescribeQuorum` request is sent to a node that is not the current leader. In addition to being inconsistent with all of the other leader APIs in the raft layer, this error is treated as fatal by both the forwarding manager and the admin client. Instead, we should return `NOT_LEADER_OR_FOLLOWER` as we do with the other APIs. This error is retriable and we can rely on the admin client to retry it after seeing this error. Reviewers: David Jacot --- .../common/requests/DescribeQuorumResponse.java| 16 +++- .../kafka/clients/admin/KafkaAdminClientTest.java | 23 + .../org/apache/kafka/raft/KafkaRaftClient.java | 5 +++- .../org/apache/kafka/raft/KafkaRaftClientTest.java | 29 ++ .../apache/kafka/raft/RaftClientTestContext.java | 27 ++-- 5 files changed, 85 insertions(+), 15 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java index cbf945b704..06ae681bc5 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java @@ -37,7 +37,7 @@ import java.util.Map; * - {@link Errors#BROKER_NOT_AVAILABLE} * * Partition level errors: - * - {@link Errors#INVALID_REQUEST} + * - {@link Errors#NOT_LEADER_OR_FOLLOWER} * - {@link Errors#UNKNOWN_TOPIC_OR_PARTITION} */ public class DescribeQuorumResponse extends AbstractResponse { @@ -72,6 +72,19 @@ public class DescribeQuorumResponse extends AbstractResponse { return DEFAULT_THROTTLE_TIME; } +public static DescribeQuorumResponseData singletonErrorResponse( +TopicPartition topicPartition, +Errors error +) { +return new DescribeQuorumResponseData() +.setTopics(Collections.singletonList(new DescribeQuorumResponseData.TopicData() +.setTopicName(topicPartition.topic()) +.setPartitions(Collections.singletonList(new DescribeQuorumResponseData.PartitionData() +.setPartitionIndex(topicPartition.partition()) +.setErrorCode(error.code()); +} + + public static DescribeQuorumResponseData singletonResponse(TopicPartition topicPartition, int leaderId, int leaderEpoch, @@ -82,6 +95,7 @@ public class DescribeQuorumResponse extends AbstractResponse { .setTopics(Collections.singletonList(new DescribeQuorumResponseData.TopicData() .setTopicName(topicPartition.topic()) .setPartitions(Collections.singletonList(new DescribeQuorumResponseData.PartitionData() +.setPartitionIndex(topicPartition.partition()) .setErrorCode(Errors.NONE.code()) .setLeaderId(leaderId) .setLeaderEpoch(leaderEpoch) diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index de57813679..5faf53f075 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -5192,6 +5192,29 @@ public class KafkaAdminClientTest { } } +@Test +public void testDescribeMetadataQuorumRetriableError() throws Exception { +try (final AdminClientUnitTestEnv env = mockClientEnv()) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(ApiKeys.DESCRIBE_QUORUM.id, +ApiKeys.DESCRIBE_QUORUM.oldestVersion(), +ApiKeys.DESCRIBE_QUORUM.latestVersion())); + +// First request fails with a NOT_LEADER_OR_FOLLOWER error (which is retriable) +env.kafkaClient().prepareResponse( +body -> body instanceof DescribeQuorumRequest, +prepareDescribeQuorumResponse(Errors.NONE, Errors.NOT_LEADER_OR_FOLLOWER, false, false, false, false, false)); + +// The second request succeeds +
[kafka] branch trunk updated: HOTFIX: Revert KAFKA-10199 which is causing compilation failures (#12532)
This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 0243bb98a7 HOTFIX: Revert KAFKA-10199 which is causing compilation failures (#12532) 0243bb98a7 is described below commit 0243bb98a7cc6bc4ed3c8f39b3a1ed9f70d0f8bd Author: Jason Gustafson AuthorDate: Wed Aug 17 14:29:49 2022 -0700 HOTFIX: Revert KAFKA-10199 which is causing compilation failures (#12532) Compilation is failing after these two commits: ``` > Task :streams:compileJava /Users/jgustafson/Projects/kafka/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:852: error: cannot find symbol tasks.addPendingTaskToClose(restoringTask.id()); ^ symbol: method addPendingTaskToClose(org.apache.kafka.streams.processor.TaskId) location: variable tasks of type org.apache.kafka.streams.processor.internals.Tasks 1 error ``` Also here: ``` [2022-08-17T20:58:20.912Z] > Task :streams:compileTestJava [2022-08-17T20:58:20.912Z] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-12530/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:822: error: method setupForRevocation(Set,Set) is already defined in class TaskManagerTest [2022-08-17T20:58:20.912Z] private TaskManager setupForRevocation(final Set tasksInStateUpdater, ``` This patch reverts them. Reviewers: Ismael Juma --- .../streams/processor/internals/TaskManager.java | 42 +-- .../kafka/streams/processor/internals/Tasks.java | 19 +-- .../processor/internals/TaskManagerTest.java | 129 + 3 files changed, 12 insertions(+), 178 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index bab05a5184..03c36b0daf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -465,7 +465,7 @@ public class TaskManager { standbyTasksToCreate.remove(taskId); } else { stateUpdater.remove(taskId); -tasks.addPendingTaskToCloseClean(taskId); +tasks.addPendingTaskToClose(taskId); } } } @@ -692,7 +692,7 @@ public class TaskManager { taskExceptions.putIfAbsent(taskId, e); } -} else if (tasks.removePendingTaskToCloseClean(task.id())) { +} else if (tasks.removePendingTaskToClose(task.id())) { try { task.suspend(); task.closeClean(); @@ -710,8 +710,6 @@ public class TaskManager { taskExceptions.putIfAbsent(task.id(), e); } -} else if (tasks.removePendingTaskToCloseDirty(task.id())) { -tasksToCloseDirty.add(task); } else if ((inputPartitions = tasks.removePendingTaskToUpdateInputPartitions(task.id())) != null) { task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id())); stateUpdater.add(task); @@ -757,8 +755,6 @@ public class TaskManager { } } -removeRevokedTasksFromStateUpdater(remainingRevokedPartitions); - if (!remainingRevokedPartitions.isEmpty()) { log.debug("The following revoked partitions {} are missing from the current task partitions. It could " + "potentially be due to race condition of consumer detecting the heartbeat failure, or the tasks " + @@ -844,20 +840,6 @@ public class TaskManager { } } -private void removeRevokedTasksFromStateUpdater(final Set remainingRevokedPartitions) { -if (stateUpdater != null) { -for (final Task restoringTask : stateUpdater.getTasks()) { -if (restoringTask.isActive()) { -if (remainingRevokedPartitions.containsAll(restoringTask.inputPartitions())) { -tasks.addPendingTaskToClose(restoringTask.id()); -stateUpdater.remove(restoringTask.id()); - remainingRevokedPartitions.removeAll(restoringTask.inputPartitions()); -} -} -} -} -} - private void prepareCommitAndAddOffsetsToMap(final Set tasksToPrepare, final Map> consumedOffsetsPerTask) { for (final Task task : tasksToPrepare) { @@ -885,15 +867,6 @@ public class TaskManager
[kafka] branch trunk updated: KAFKA-10199: Remove tasks from state updater on revocation (#12520)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new b47c4d8598 KAFKA-10199: Remove tasks from state updater on revocation (#12520) b47c4d8598 is described below commit b47c4d859805068de6a8fe8de3bda5e7a21132e2 Author: Bruno Cadonna AuthorDate: Wed Aug 17 20:13:34 2022 +0200 KAFKA-10199: Remove tasks from state updater on revocation (#12520) Removes tasks from the state updater when the input partitions of the tasks are revoked during a rebalance. Reviewers: Guozhang Wang --- .../streams/processor/internals/TaskManager.java | 16 + .../processor/internals/TaskManagerTest.java | 81 ++ 2 files changed, 97 insertions(+) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 4bba28a3f3..bab05a5184 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -757,6 +757,8 @@ public class TaskManager { } } +removeRevokedTasksFromStateUpdater(remainingRevokedPartitions); + if (!remainingRevokedPartitions.isEmpty()) { log.debug("The following revoked partitions {} are missing from the current task partitions. It could " + "potentially be due to race condition of consumer detecting the heartbeat failure, or the tasks " + @@ -842,6 +844,20 @@ public class TaskManager { } } +private void removeRevokedTasksFromStateUpdater(final Set remainingRevokedPartitions) { +if (stateUpdater != null) { +for (final Task restoringTask : stateUpdater.getTasks()) { +if (restoringTask.isActive()) { +if (remainingRevokedPartitions.containsAll(restoringTask.inputPartitions())) { +tasks.addPendingTaskToClose(restoringTask.id()); +stateUpdater.remove(restoringTask.id()); + remainingRevokedPartitions.removeAll(restoringTask.inputPartitions()); +} +} +} +} +} + private void prepareCommitAndAddOffsetsToMap(final Set tasksToPrepare, final Map> consumedOffsetsPerTask) { for (final Task task : tasksToPrepare) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 133541bfac..ff52ad5ae9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -751,6 +751,87 @@ public class TaskManagerTest { assertThat(taskManager.getTaskOffsetSums(), is(expectedOffsetSums)); } +@Test +public void shouldRemoveStatefulTaskWithRevokedInputPartitionsFromStateUpdaterOnRevocation() { +final StreamTask task = statefulTask(taskId00, taskId00ChangelogPartitions) +.inState(State.RESTORING) +.withInputPartitions(taskId00Partitions).build(); +final TaskManager taskManager = setupForRevocation(mkSet(task), mkSet(task)); + +taskManager.handleRevocation(taskId00Partitions); + +Mockito.verify(stateUpdater).remove(task.id()); + +taskManager.tryToCompleteRestoration(time.milliseconds(), null); + +Mockito.verify(task).closeClean(); +} + +public void shouldRemoveMultipleStatefulTaskWithRevokedInputPartitionsFromStateUpdaterOnRevocation() { +final StreamTask task1 = statefulTask(taskId00, taskId00ChangelogPartitions) +.inState(State.RESTORING) +.withInputPartitions(taskId00Partitions).build(); +final StreamTask task2 = statefulTask(taskId01, taskId01ChangelogPartitions) +.inState(State.RESTORING) +.withInputPartitions(taskId01Partitions).build(); +final TaskManager taskManager = setupForRevocation(mkSet(task1, task2), mkSet(task1, task2)); + +taskManager.handleRevocation(union(HashSet::new, taskId00Partitions, taskId01Partitions)); + +Mockito.verify(stateUpdater).remove(task1.id()); +Mockito.verify(stateUpdater).remove(task2.id()); + +taskManager.tryToCompleteRestoration(time.milliseconds(), null); + +Mockito.verify(task1).closeClean(); +Mockito.verify(task2).closeClean(); +} + +@Test +public void shouldNotRemoveStatefulTaskWithoutRevokedInputPartitionsFromStateUpdaterOnRevocation() { +
[kafka] branch trunk updated: KAFKA-10199: Remove tasks from state updater on partition lost (#12521)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 9f20f89953 KAFKA-10199: Remove tasks from state updater on partition lost (#12521) 9f20f89953 is described below commit 9f20f8995399d9e03f518f7b9c8be2bffb2fdcfc Author: Bruno Cadonna AuthorDate: Wed Aug 17 20:12:30 2022 +0200 KAFKA-10199: Remove tasks from state updater on partition lost (#12521) Removes tasks from the state updater when the input partitions of the tasks are lost during a rebalance. Reviewers: Guozhang Wang --- .../streams/processor/internals/TaskManager.java | 26 ++-- .../kafka/streams/processor/internals/Tasks.java | 19 ++--- .../processor/internals/TaskManagerTest.java | 48 -- 3 files changed, 81 insertions(+), 12 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 03c36b0daf..4bba28a3f3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -465,7 +465,7 @@ public class TaskManager { standbyTasksToCreate.remove(taskId); } else { stateUpdater.remove(taskId); -tasks.addPendingTaskToClose(taskId); +tasks.addPendingTaskToCloseClean(taskId); } } } @@ -692,7 +692,7 @@ public class TaskManager { taskExceptions.putIfAbsent(taskId, e); } -} else if (tasks.removePendingTaskToClose(task.id())) { +} else if (tasks.removePendingTaskToCloseClean(task.id())) { try { task.suspend(); task.closeClean(); @@ -710,6 +710,8 @@ public class TaskManager { taskExceptions.putIfAbsent(task.id(), e); } +} else if (tasks.removePendingTaskToCloseDirty(task.id())) { +tasksToCloseDirty.add(task); } else if ((inputPartitions = tasks.removePendingTaskToUpdateInputPartitions(task.id())) != null) { task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id())); stateUpdater.add(task); @@ -867,6 +869,15 @@ public class TaskManager { void handleLostAll() { log.debug("Closing lost active tasks as zombies."); +closeRunningTasksDirty(); +removeLostTasksFromStateUpdater(); + +if (processingMode == EXACTLY_ONCE_V2) { +activeTaskCreator.reInitializeThreadProducer(); +} +} + +private void closeRunningTasksDirty() { final Set allTask = tasks.allTasks(); for (final Task task : allTask) { // Even though we've apparently dropped out of the group, we can continue safely to maintain our @@ -875,9 +886,16 @@ public class TaskManager { closeTaskDirty(task); } } +} -if (processingMode == EXACTLY_ONCE_V2) { -activeTaskCreator.reInitializeThreadProducer(); +private void removeLostTasksFromStateUpdater() { +if (stateUpdater != null) { +for (final Task restoringTask : stateUpdater.getTasks()) { +if (restoringTask.isActive()) { +tasks.addPendingTaskToCloseDirty(restoringTask.id()); +stateUpdater.remove(restoringTask.id()); +} +} } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java index 9628b42d92..8178fe3691 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java @@ -55,7 +55,9 @@ class Tasks { private final Map> pendingTasksToRecycle = new HashMap<>(); private final Map> pendingTasksToUpdateInputPartitions = new HashMap<>(); private final Set pendingTasksToInit = new HashSet<>(); -private final Set pendingTasksToClose = new HashSet<>(); +private final Set pendingTasksToCloseClean = new HashSet<>(); + +private final Set pendingTasksToCloseDirty = new HashSet<>(); // TODO: convert to Stream/StandbyTask when we remove TaskManager#StateMachineTask with mocks private final Map activeTasksPerPartition = new HashMap<>(); @@ -111,12 +113,19 @@ class Tasks { pendingTasksToUpdateInputPartitions.put(taskId, inputPartitions); } -boolean removePendingTaskToClose(final TaskId taskId) { -
[kafka] branch trunk updated: MINOR: Fix invalid link to plugin.path property docs in quickstart (#12523)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new ae3f48b699 MINOR: Fix invalid link to plugin.path property docs in quickstart (#12523) ae3f48b699 is described below commit ae3f48b699f8e101dc9bf53aaa8e341c581f37af Author: Janik Dotzel AuthorDate: Wed Aug 17 19:25:35 2022 +0200 MINOR: Fix invalid link to plugin.path property docs in quickstart (#12523) Reviewers: Chris Egerton --- docs/quickstart.html | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/quickstart.html b/docs/quickstart.html index 3a75211d0b..7867cb0b10 100644 --- a/docs/quickstart.html +++ b/docs/quickstart.html @@ -175,7 +175,7 @@ This is my second event First, make sure to add connect-file-{{fullDotVersion}}.jar to the plugin.path property in the Connect worker's configuration. For the purpose of this quickstart we'll use a relative path and consider the connectors' package as an uber jar, which works when the quickstart commands are run from the installation directory. -However, it's worth noting that for production deployments using absolute paths is always preferable. See plugin.path for a detailed description of how to set this config. +However, it's worth noting that for production deployments using absolute paths is always preferable. See plugin.path for a detailed description of how to set this config.
[kafka] branch trunk updated: KAFKA-13971:Atomicity violations caused by improper usage of ConcurrentHashMap (#12277)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new d8e93c368d KAFKA-13971:Atomicity violations caused by improper usage of ConcurrentHashMap (#12277) d8e93c368d is described below commit d8e93c368db2862502c8180e4d536edb20ac4d72 Author: Kvicii <42023367+kvi...@users.noreply.github.com> AuthorDate: Thu Aug 18 00:32:24 2022 +0800 KAFKA-13971:Atomicity violations caused by improper usage of ConcurrentHashMap (#12277) Reviewers: Divij Vaidya , Chris Egerton --- .../connect/runtime/WorkerConfigTransformer.java | 25 +- 1 file changed, 10 insertions(+), 15 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java index 4d9c4c1f74..989de85e0c 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java @@ -78,25 +78,20 @@ public class WorkerConfigTransformer implements AutoCloseable { } private void scheduleReload(String connectorName, String path, long ttl) { -Map connectorRequests = requests.get(connectorName); -if (connectorRequests == null) { -connectorRequests = new ConcurrentHashMap<>(); -requests.put(connectorName, connectorRequests); -} else { -HerderRequest previousRequest = connectorRequests.get(path); +Map connectorRequests = requests.computeIfAbsent(connectorName, s -> new ConcurrentHashMap<>()); +connectorRequests.compute(path, (s, previousRequest) -> { if (previousRequest != null) { // Delete previous request for ttl which is now stale previousRequest.cancel(); } -} -log.info("Scheduling a restart of connector {} in {} ms", connectorName, ttl); -Callback cb = (error, result) -> { -if (error != null) { -log.error("Unexpected error during connector restart: ", error); -} -}; -HerderRequest request = worker.herder().restartConnector(ttl, connectorName, cb); -connectorRequests.put(path, request); +log.info("Scheduling a restart of connector {} in {} ms", connectorName, ttl); +Callback cb = (error, result) -> { +if (error != null) { +log.error("Unexpected error during connector restart: ", error); +} +}; +return worker.herder().restartConnector(ttl, connectorName, cb); +}); } @Override