[kafka] branch trunk updated: HOTFIX: KIP-851, rename requireStable in ListConsumerGroupOffsetsOptions

2022-07-06 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new ca8135b242a HOTFIX: KIP-851, rename requireStable in 
ListConsumerGroupOffsetsOptions
ca8135b242a is described below

commit ca8135b242a5d5fd71e5edffbcb85ada2f5d9bd2
Author: Guozhang Wang 
AuthorDate: Wed Jul 6 22:00:31 2022 -0700

HOTFIX: KIP-851, rename requireStable in ListConsumerGroupOffsetsOptions
---
 .../src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java  | 2 +-
 .../org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 99fd98b443f..aad2610c94a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -3404,7 +3404,7 @@ public class KafkaAdminClient extends AdminClient {
final 
ListConsumerGroupOffsetsOptions options) {
 SimpleAdminApiFuture> future =
 ListConsumerGroupOffsetsHandler.newFuture(groupId);
-ListConsumerGroupOffsetsHandler handler = new 
ListConsumerGroupOffsetsHandler(groupId, options.topicPartitions(), 
options.shouldRequireStable(), logContext);
+ListConsumerGroupOffsetsHandler handler = new 
ListConsumerGroupOffsetsHandler(groupId, options.topicPartitions(), 
options.requireStable(), logContext);
 invokeDriver(handler, future, options.timeoutMs);
 return new 
ListConsumerGroupOffsetsResult(future.get(CoordinatorKey.byGroupId(groupId)));
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java
index a06feaf3b38..292a47ef393 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java
@@ -60,7 +60,7 @@ public class ListConsumerGroupOffsetsOptions extends 
AbstractOptions

[kafka] branch trunk updated (6495a0768ca -> 915c7812439)

2022-07-06 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from 6495a0768ca KAFKA-14032; Dequeue time for forwarded requests is unset 
(#12360)
 add 915c7812439 KAFKA-10199: Remove main consumer from store changelog 
reader (#12337)

No new revisions were added by this update.

Summary of changes:
 .../kafka/clients/admin/KafkaAdminClient.java  |  2 +-
 .../admin/ListConsumerGroupOffsetsOptions.java | 13 
 .../internals/ListConsumerGroupOffsetsHandler.java | 15 -
 .../kafka/clients/admin/AdminClientTestUtils.java  |  7 +++
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 33 ++
 .../kafka/clients/admin/MockAdminClient.java   | 38 ++--
 .../processor/internals/DefaultStateUpdater.java   |  3 +-
 .../processor/internals/StoreChangelogReader.java  | 71 --
 .../streams/processor/internals/StreamThread.java  |  1 -
 .../internals/DefaultStateUpdaterTest.java |  1 +
 .../processor/internals/StandbyTaskTest.java   |  8 ++-
 .../internals/StoreChangelogReaderTest.java| 50 ++-
 .../processor/internals/StreamTaskTest.java| 27 +++-
 13 files changed, 188 insertions(+), 81 deletions(-)



[kafka] branch trunk updated: KAFKA-14032; Dequeue time for forwarded requests is unset (#12360)

2022-07-06 Thread jgus
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 6495a0768ca KAFKA-14032; Dequeue time for forwarded requests is unset 
(#12360)
6495a0768ca is described below

commit 6495a0768cae1086d4b1dfa466967dfe178c1553
Author: YU 
AuthorDate: Thu Jul 7 04:21:28 2022 +0800

KAFKA-14032; Dequeue time for forwarded requests is unset (#12360)

When building a forwarded request, we need to override the dequeue time of 
the underlying request to match the same value as the envelope. Otherwise, the 
field is left unset, which causes inaccurate reporting.

Reviewers; Jason Gustafson 
---
 core/src/main/scala/kafka/server/EnvelopeUtils.scala  |  5 -
 core/src/test/scala/unit/kafka/server/KafkaApisTest.scala | 10 +++---
 core/src/test/scala/unit/kafka/utils/TestUtils.scala  |  5 -
 3 files changed, 15 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/kafka/server/EnvelopeUtils.scala 
b/core/src/main/scala/kafka/server/EnvelopeUtils.scala
index a162ae5fe80..97c532ebb45 100644
--- a/core/src/main/scala/kafka/server/EnvelopeUtils.scala
+++ b/core/src/main/scala/kafka/server/EnvelopeUtils.scala
@@ -84,7 +84,7 @@ object EnvelopeUtils {
 requestChannelMetrics: RequestChannel.Metrics
   ): RequestChannel.Request = {
 try {
-  new RequestChannel.Request(
+  val forwardedRequest = new RequestChannel.Request(
 processor = envelope.processor,
 context = forwardedContext,
 startTimeNanos = envelope.startTimeNanos,
@@ -93,6 +93,9 @@ object EnvelopeUtils {
 requestChannelMetrics,
 Some(envelope)
   )
+  // set the dequeue time of forwardedRequest as the value of envelope 
request
+  forwardedRequest.requestDequeueTimeNanos = 
envelope.requestDequeueTimeNanos
+  forwardedRequest
 } catch {
   case e: InvalidRequestException =>
 // We use UNSUPPORTED_VERSION if the embedded request cannot be parsed.
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index cc34cabe05a..d176f369f8d 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -307,8 +307,10 @@ class KafkaApisTest {
 Seq(new AlterConfigsRequest.ConfigEntry("foo", "bar")).asJava))
 val alterConfigsRequest = new AlterConfigsRequest.Builder(configs.asJava, 
false).build(requestHeader.apiVersion)
 
+val startTimeNanos = time.nanoseconds()
+val queueDurationNanos = 5 * 1000 * 1000
 val request = TestUtils.buildEnvelopeRequest(
-  alterConfigsRequest, kafkaPrincipalSerde, requestChannelMetrics, 
time.nanoseconds())
+  alterConfigsRequest, kafkaPrincipalSerde, requestChannelMetrics, 
startTimeNanos, startTimeNanos + queueDurationNanos)
 
 val capturedResponse: ArgumentCaptor[AlterConfigsResponse] = 
ArgumentCaptor.forClass(classOf[AlterConfigsResponse])
 val capturedRequest: ArgumentCaptor[RequestChannel.Request] = 
ArgumentCaptor.forClass(classOf[RequestChannel.Request])
@@ -321,6 +323,8 @@ class KafkaApisTest {
   any()
 )
 assertEquals(Some(request), capturedRequest.getValue.envelope)
+// the dequeue time of forwarded request should equals to envelop request
+assertEquals(request.requestDequeueTimeNanos, 
capturedRequest.getValue.requestDequeueTimeNanos)
 val innerResponse = capturedResponse.getValue
 val responseMap = innerResponse.data.responses().asScala.map { 
resourceResponse =>
   resourceResponse.resourceName() -> 
Errors.forCode(resourceResponse.errorCode)
@@ -397,7 +401,7 @@ class KafkaApisTest {
   .build(requestHeader.apiVersion)
 
 val request = TestUtils.buildEnvelopeRequest(
-  alterConfigsRequest, kafkaPrincipalSerde, requestChannelMetrics, 
time.nanoseconds(), fromPrivilegedListener)
+  alterConfigsRequest, kafkaPrincipalSerde, requestChannelMetrics, 
time.nanoseconds(), fromPrivilegedListener = fromPrivilegedListener)
 
 val capturedResponse: ArgumentCaptor[AbstractResponse] = 
ArgumentCaptor.forClass(classOf[AbstractResponse])
 createKafkaApis(authorizer = Some(authorizer), enableForwarding = 
true).handle(request, RequestLocal.withThreadConfinedCaching)
@@ -1614,7 +1618,7 @@ class KafkaApisTest {
 
   assertEquals(1, response.data.responses.size)
   val topicProduceResponse = response.data.responses.asScala.head
-  assertEquals(1, topicProduceResponse.partitionResponses.size)   
+  assertEquals(1, topicProduceResponse.partitionResponses.size)
   val partitionProduceResponse = 
topicProduceResponse.partitionResponses.asScala.head
   assertEquals(Errors.INVALID_PRODUCER_EPOCH, 
Errors.forCode(partitionProduceResponse.errorCode))
 }
diff --git 

[kafka] branch trunk updated: KAFKA-10199: Remove call to Task#completeRestoration from state updater (#12379)

2022-07-06 Thread cadonna
This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 00f395bb889 KAFKA-10199: Remove call to Task#completeRestoration from 
state updater (#12379)
00f395bb889 is described below

commit 00f395bb889252e3efb62cedfaa1209b58d5fd3c
Author: Bruno Cadonna 
AuthorDate: Wed Jul 6 12:36:15 2022 +0200

KAFKA-10199: Remove call to Task#completeRestoration from state updater 
(#12379)

The call to Task#completeRestoration calls methods on the main consumer.
The state updater thread should not access the main consumer since the
main consumer is not thread-safe. Additionally, Task#completeRestoration
changed the state of active tasks, but we decided to keep task life cycle
management outside of the state updater.

Task#completeRestoration should be called by the stream thread on
restored active tasks returned by the state udpater.

Reviewer: Guozhang Wang 
---
 .../processor/internals/DefaultStateUpdater.java   | 13 ++---
 .../internals/DefaultStateUpdaterTest.java | 31 +-
 2 files changed, 9 insertions(+), 35 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
index 886a37b3140..3969a911f33 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
@@ -46,7 +46,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.Consumer;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -60,16 +59,12 @@ public class DefaultStateUpdater implements StateUpdater {
 
 private final ChangelogReader changelogReader;
 private final AtomicBoolean isRunning = new AtomicBoolean(true);
-private final Consumer> offsetResetter;
 private final Map updatingTasks = new 
ConcurrentHashMap<>();
 private final Logger log;
 
-public StateUpdaterThread(final String name,
-  final ChangelogReader changelogReader,
-  final Consumer> 
offsetResetter) {
+public StateUpdaterThread(final String name, final ChangelogReader 
changelogReader) {
 super(name);
 this.changelogReader = changelogReader;
-this.offsetResetter = offsetResetter;
 
 final String logPrefix = String.format("%s ", name);
 final LogContext logContext = new LogContext(logPrefix);
@@ -286,7 +281,6 @@ public class DefaultStateUpdater implements StateUpdater {
   final Set 
restoredChangelogs) {
 final Collection taskChangelogPartitions = 
task.changelogPartitions();
 if (restoredChangelogs.containsAll(taskChangelogPartitions)) {
-task.completeRestoration(offsetResetter);
 task.maybeCheckpoint(true);
 addToRestoredTasks(task);
 updatingTasks.remove(task.id());
@@ -332,7 +326,6 @@ public class DefaultStateUpdater implements StateUpdater {
 
 private final Time time;
 private final ChangelogReader changelogReader;
-private final Consumer> offsetResetter;
 private final Queue tasksAndActions = new LinkedList<>();
 private final Lock tasksAndActionsLock = new ReentrantLock();
 private final Condition tasksAndActionsCondition = 
tasksAndActionsLock.newCondition();
@@ -350,17 +343,15 @@ public class DefaultStateUpdater implements StateUpdater {
 
 public DefaultStateUpdater(final StreamsConfig config,
final ChangelogReader changelogReader,
-   final Consumer> 
offsetResetter,
final Time time) {
 this.changelogReader = changelogReader;
-this.offsetResetter = offsetResetter;
 this.time = time;
 this.commitIntervalMs = 
config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG);
 }
 
 public void start() {
 if (stateUpdaterThread == null) {
-stateUpdaterThread = new StateUpdaterThread("state-updater", 
changelogReader, offsetResetter);
+stateUpdaterThread = new StateUpdaterThread("state-updater", 
changelogReader);
 stateUpdaterThread.start();
 shutdownGate = new CountDownLatch(1);
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java