[kafka] branch trunk updated: HOTFIX: KIP-851, rename requireStable in ListConsumerGroupOffsetsOptions
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new ca8135b242a HOTFIX: KIP-851, rename requireStable in ListConsumerGroupOffsetsOptions ca8135b242a is described below commit ca8135b242a5d5fd71e5edffbcb85ada2f5d9bd2 Author: Guozhang Wang AuthorDate: Wed Jul 6 22:00:31 2022 -0700 HOTFIX: KIP-851, rename requireStable in ListConsumerGroupOffsetsOptions --- .../src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java | 2 +- .../org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 99fd98b443f..aad2610c94a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -3404,7 +3404,7 @@ public class KafkaAdminClient extends AdminClient { final ListConsumerGroupOffsetsOptions options) { SimpleAdminApiFuture> future = ListConsumerGroupOffsetsHandler.newFuture(groupId); -ListConsumerGroupOffsetsHandler handler = new ListConsumerGroupOffsetsHandler(groupId, options.topicPartitions(), options.shouldRequireStable(), logContext); +ListConsumerGroupOffsetsHandler handler = new ListConsumerGroupOffsetsHandler(groupId, options.topicPartitions(), options.requireStable(), logContext); invokeDriver(handler, future, options.timeoutMs); return new ListConsumerGroupOffsetsResult(future.get(CoordinatorKey.byGroupId(groupId))); } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java index a06feaf3b38..292a47ef393 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java @@ -60,7 +60,7 @@ public class ListConsumerGroupOffsetsOptions extends AbstractOptions
[kafka] branch trunk updated (6495a0768ca -> 915c7812439)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 6495a0768ca KAFKA-14032; Dequeue time for forwarded requests is unset (#12360) add 915c7812439 KAFKA-10199: Remove main consumer from store changelog reader (#12337) No new revisions were added by this update. Summary of changes: .../kafka/clients/admin/KafkaAdminClient.java | 2 +- .../admin/ListConsumerGroupOffsetsOptions.java | 13 .../internals/ListConsumerGroupOffsetsHandler.java | 15 - .../kafka/clients/admin/AdminClientTestUtils.java | 7 +++ .../kafka/clients/admin/KafkaAdminClientTest.java | 33 ++ .../kafka/clients/admin/MockAdminClient.java | 38 ++-- .../processor/internals/DefaultStateUpdater.java | 3 +- .../processor/internals/StoreChangelogReader.java | 71 -- .../streams/processor/internals/StreamThread.java | 1 - .../internals/DefaultStateUpdaterTest.java | 1 + .../processor/internals/StandbyTaskTest.java | 8 ++- .../internals/StoreChangelogReaderTest.java| 50 ++- .../processor/internals/StreamTaskTest.java| 27 +++- 13 files changed, 188 insertions(+), 81 deletions(-)
[kafka] branch trunk updated: KAFKA-14032; Dequeue time for forwarded requests is unset (#12360)
This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 6495a0768ca KAFKA-14032; Dequeue time for forwarded requests is unset (#12360) 6495a0768ca is described below commit 6495a0768cae1086d4b1dfa466967dfe178c1553 Author: YU AuthorDate: Thu Jul 7 04:21:28 2022 +0800 KAFKA-14032; Dequeue time for forwarded requests is unset (#12360) When building a forwarded request, we need to override the dequeue time of the underlying request to match the same value as the envelope. Otherwise, the field is left unset, which causes inaccurate reporting. Reviewers; Jason Gustafson --- core/src/main/scala/kafka/server/EnvelopeUtils.scala | 5 - core/src/test/scala/unit/kafka/server/KafkaApisTest.scala | 10 +++--- core/src/test/scala/unit/kafka/utils/TestUtils.scala | 5 - 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/server/EnvelopeUtils.scala b/core/src/main/scala/kafka/server/EnvelopeUtils.scala index a162ae5fe80..97c532ebb45 100644 --- a/core/src/main/scala/kafka/server/EnvelopeUtils.scala +++ b/core/src/main/scala/kafka/server/EnvelopeUtils.scala @@ -84,7 +84,7 @@ object EnvelopeUtils { requestChannelMetrics: RequestChannel.Metrics ): RequestChannel.Request = { try { - new RequestChannel.Request( + val forwardedRequest = new RequestChannel.Request( processor = envelope.processor, context = forwardedContext, startTimeNanos = envelope.startTimeNanos, @@ -93,6 +93,9 @@ object EnvelopeUtils { requestChannelMetrics, Some(envelope) ) + // set the dequeue time of forwardedRequest as the value of envelope request + forwardedRequest.requestDequeueTimeNanos = envelope.requestDequeueTimeNanos + forwardedRequest } catch { case e: InvalidRequestException => // We use UNSUPPORTED_VERSION if the embedded request cannot be parsed. diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index cc34cabe05a..d176f369f8d 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -307,8 +307,10 @@ class KafkaApisTest { Seq(new AlterConfigsRequest.ConfigEntry("foo", "bar")).asJava)) val alterConfigsRequest = new AlterConfigsRequest.Builder(configs.asJava, false).build(requestHeader.apiVersion) +val startTimeNanos = time.nanoseconds() +val queueDurationNanos = 5 * 1000 * 1000 val request = TestUtils.buildEnvelopeRequest( - alterConfigsRequest, kafkaPrincipalSerde, requestChannelMetrics, time.nanoseconds()) + alterConfigsRequest, kafkaPrincipalSerde, requestChannelMetrics, startTimeNanos, startTimeNanos + queueDurationNanos) val capturedResponse: ArgumentCaptor[AlterConfigsResponse] = ArgumentCaptor.forClass(classOf[AlterConfigsResponse]) val capturedRequest: ArgumentCaptor[RequestChannel.Request] = ArgumentCaptor.forClass(classOf[RequestChannel.Request]) @@ -321,6 +323,8 @@ class KafkaApisTest { any() ) assertEquals(Some(request), capturedRequest.getValue.envelope) +// the dequeue time of forwarded request should equals to envelop request +assertEquals(request.requestDequeueTimeNanos, capturedRequest.getValue.requestDequeueTimeNanos) val innerResponse = capturedResponse.getValue val responseMap = innerResponse.data.responses().asScala.map { resourceResponse => resourceResponse.resourceName() -> Errors.forCode(resourceResponse.errorCode) @@ -397,7 +401,7 @@ class KafkaApisTest { .build(requestHeader.apiVersion) val request = TestUtils.buildEnvelopeRequest( - alterConfigsRequest, kafkaPrincipalSerde, requestChannelMetrics, time.nanoseconds(), fromPrivilegedListener) + alterConfigsRequest, kafkaPrincipalSerde, requestChannelMetrics, time.nanoseconds(), fromPrivilegedListener = fromPrivilegedListener) val capturedResponse: ArgumentCaptor[AbstractResponse] = ArgumentCaptor.forClass(classOf[AbstractResponse]) createKafkaApis(authorizer = Some(authorizer), enableForwarding = true).handle(request, RequestLocal.withThreadConfinedCaching) @@ -1614,7 +1618,7 @@ class KafkaApisTest { assertEquals(1, response.data.responses.size) val topicProduceResponse = response.data.responses.asScala.head - assertEquals(1, topicProduceResponse.partitionResponses.size) + assertEquals(1, topicProduceResponse.partitionResponses.size) val partitionProduceResponse = topicProduceResponse.partitionResponses.asScala.head assertEquals(Errors.INVALID_PRODUCER_EPOCH, Errors.forCode(partitionProduceResponse.errorCode)) } diff --git
[kafka] branch trunk updated: KAFKA-10199: Remove call to Task#completeRestoration from state updater (#12379)
This is an automated email from the ASF dual-hosted git repository. cadonna pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 00f395bb889 KAFKA-10199: Remove call to Task#completeRestoration from state updater (#12379) 00f395bb889 is described below commit 00f395bb889252e3efb62cedfaa1209b58d5fd3c Author: Bruno Cadonna AuthorDate: Wed Jul 6 12:36:15 2022 +0200 KAFKA-10199: Remove call to Task#completeRestoration from state updater (#12379) The call to Task#completeRestoration calls methods on the main consumer. The state updater thread should not access the main consumer since the main consumer is not thread-safe. Additionally, Task#completeRestoration changed the state of active tasks, but we decided to keep task life cycle management outside of the state updater. Task#completeRestoration should be called by the stream thread on restored active tasks returned by the state udpater. Reviewer: Guozhang Wang --- .../processor/internals/DefaultStateUpdater.java | 13 ++--- .../internals/DefaultStateUpdaterTest.java | 31 +- 2 files changed, 9 insertions(+), 35 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java index 886a37b3140..3969a911f33 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java @@ -46,7 +46,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -60,16 +59,12 @@ public class DefaultStateUpdater implements StateUpdater { private final ChangelogReader changelogReader; private final AtomicBoolean isRunning = new AtomicBoolean(true); -private final Consumer> offsetResetter; private final Map updatingTasks = new ConcurrentHashMap<>(); private final Logger log; -public StateUpdaterThread(final String name, - final ChangelogReader changelogReader, - final Consumer> offsetResetter) { +public StateUpdaterThread(final String name, final ChangelogReader changelogReader) { super(name); this.changelogReader = changelogReader; -this.offsetResetter = offsetResetter; final String logPrefix = String.format("%s ", name); final LogContext logContext = new LogContext(logPrefix); @@ -286,7 +281,6 @@ public class DefaultStateUpdater implements StateUpdater { final Set restoredChangelogs) { final Collection taskChangelogPartitions = task.changelogPartitions(); if (restoredChangelogs.containsAll(taskChangelogPartitions)) { -task.completeRestoration(offsetResetter); task.maybeCheckpoint(true); addToRestoredTasks(task); updatingTasks.remove(task.id()); @@ -332,7 +326,6 @@ public class DefaultStateUpdater implements StateUpdater { private final Time time; private final ChangelogReader changelogReader; -private final Consumer> offsetResetter; private final Queue tasksAndActions = new LinkedList<>(); private final Lock tasksAndActionsLock = new ReentrantLock(); private final Condition tasksAndActionsCondition = tasksAndActionsLock.newCondition(); @@ -350,17 +343,15 @@ public class DefaultStateUpdater implements StateUpdater { public DefaultStateUpdater(final StreamsConfig config, final ChangelogReader changelogReader, - final Consumer> offsetResetter, final Time time) { this.changelogReader = changelogReader; -this.offsetResetter = offsetResetter; this.time = time; this.commitIntervalMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG); } public void start() { if (stateUpdaterThread == null) { -stateUpdaterThread = new StateUpdaterThread("state-updater", changelogReader, offsetResetter); +stateUpdaterThread = new StateUpdaterThread("state-updater", changelogReader); stateUpdaterThread.start(); shutdownGate = new CountDownLatch(1); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java