[GitHub] [kafka] vamossagar12 commented on pull request #12971: KAFKA-14454: Making unique StreamsConfig for tests

2022-12-08 Thread GitBox


vamossagar12 commented on PR #12971:
URL: https://github.com/apache/kafka/pull/12971#issuecomment-1343960862

   Looks like Commit failed after one of the suggestions. Would push a fix for 
that in sometime.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a diff in pull request #12971: KAFKA-14454: Making unique StreamsConfig for tests

2022-12-08 Thread GitBox


ableegoldman commented on code in PR #12971:
URL: https://github.com/apache/kafka/pull/12971#discussion_r1044153926


##
streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java:
##
@@ -196,19 +202,16 @@ public void 
shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultipl
 
 for (final KafkaStreams stream: kafkaStreamsList) {
 stream.setUncaughtExceptionHandler(e -> {
-assertEquals("The partitions returned by 
StreamPartitioner#partitions method when used for FK join should be a singleton 
set", e.getCause().getMessage());
+Assertions.assertEquals("The partitions returned by 
StreamPartitioner#partitions method when used for FK join should be a singleton 
set", e.getCause().getMessage());
 return 
StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
 });
 }
 
 startApplicationAndWaitUntilRunning(kafkaStreamsList, ofSeconds(120));
 
-// Sleeping to let the processing happen inducing the failure
-Thread.sleep(6);
+// the streams applications should in failed state due to the 
IllegalStateException.
+waitForApplicationState(Arrays.asList(streams, streamsTwo, 
streamsThree), KafkaStreams.State.ERROR, ofSeconds(30));

Review Comment:
   nit: no idea if we actually used the full 60s that we used to sleep, but 
let's continue to give it 60s



##
streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java:
##
@@ -196,19 +202,16 @@ public void 
shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultipl
 
 for (final KafkaStreams stream: kafkaStreamsList) {
 stream.setUncaughtExceptionHandler(e -> {
-assertEquals("The partitions returned by 
StreamPartitioner#partitions method when used for FK join should be a singleton 
set", e.getCause().getMessage());
+Assertions.assertEquals("The partitions returned by 
StreamPartitioner#partitions method when used for FK join should be a singleton 
set", e.getCause().getMessage());
 return 
StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
 });
 }
 
 startApplicationAndWaitUntilRunning(kafkaStreamsList, ofSeconds(120));
 
-// Sleeping to let the processing happen inducing the failure
-Thread.sleep(6);
+// the streams applications should in failed state due to the 
IllegalStateException.

Review Comment:
   ```suggestion
   // the streams applications should have shut down into `ERROR` due 
to the IllegalStateException
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #12951: MINOR: Move MetadataQuorumCommand from `core` to `tools`

2022-12-08 Thread GitBox


ijuma commented on code in PR #12951:
URL: https://github.com/apache/kafka/pull/12951#discussion_r1044153255


##
bin/kafka-metadata-quorum.sh:
##
@@ -14,4 +14,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-exec $(dirname $0)/kafka-run-class.sh kafka.admin.MetadataQuorumCommand "$@"

Review Comment:
   Good catch! Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a diff in pull request #12971: KAFKA-14454: Making unique StreamsConfig for tests

2022-12-08 Thread GitBox


ableegoldman commented on code in PR #12971:
URL: https://github.com/apache/kafka/pull/12971#discussion_r1044152824


##
streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java:
##
@@ -196,19 +202,16 @@ public void 
shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultipl
 
 for (final KafkaStreams stream: kafkaStreamsList) {
 stream.setUncaughtExceptionHandler(e -> {
-assertEquals("The partitions returned by 
StreamPartitioner#partitions method when used for FK join should be a singleton 
set", e.getCause().getMessage());
+Assertions.assertEquals("The partitions returned by 
StreamPartitioner#partitions method when used for FK join should be a singleton 
set", e.getCause().getMessage());

Review Comment:
   We've been moving to `assertThat` instead of `assertEquals` and co:
   ```suggestion
   assertThat(e.getCause().getMessage(), equalTo("The 
partitions returned by StreamPartitioner#partitions method when used for FK 
join should be a singleton set"));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #12937: KAFKA-13881: Add Connect package infos

2022-12-08 Thread GitBox


ijuma commented on PR #12937:
URL: https://github.com/apache/kafka/pull/12937#issuecomment-1343936394

   Thanks for improving the javadocs!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 opened a new pull request, #12971: KAFKA-14454: Making unique StreamsConfig for tests

2022-12-08 Thread GitBox


vamossagar12 opened a new pull request, #12971:
URL: https://github.com/apache/kafka/pull/12971

   Newly added test 
KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions
 as part of KIP-837 passes when run individually but fails when is part of IT 
class and hence is marked as Ignored. 
   
   That seemed to have been because of the way StreamsConfig was being 
initialised so any new test would have used the same names. Because of which 
the second test never got to the desired state. With this PR, every test gets a 
unique app name which seems to have fixed the issue. Also, a couple of cosmetic 
changes
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14454) KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions passes when run individua

2022-12-08 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14454?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao updated KAFKA-14454:
--
Description: 
Newly added test 
KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions
 as part of KIP-837 passes when run individually but fails when is part of IT 
class and hence is marked as Ignored. 

 

  was:
Newly added test 
KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions
 as part of KIP-837 passes when run individually but fails when is part of IT 
class and hence is marked as Ignored. 

As part of this ticket, we can also look to move to Junit5 annotations for this 
class since it relies on Junit4 ones.


> KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions
>  passes when run individually but not when is run as part of the IT
> --
>
> Key: KAFKA-14454
> URL: https://issues.apache.org/jira/browse/KAFKA-14454
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sagar Rao
>Assignee: Sagar Rao
>Priority: Major
>
> Newly added test 
> KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions
>  as part of KIP-837 passes when run individually but fails when is part of IT 
> class and hence is marked as Ignored. 
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] aglicacha commented on pull request #12729: KAFKA-14285: Delete quota node in zookeeper when configs are empty

2022-12-08 Thread GitBox


aglicacha commented on PR #12729:
URL: https://github.com/apache/kafka/pull/12729#issuecomment-1343708253

   > @aglicacha , there is compile error, could you take a look?
   
   The error was caused by the unnecessary default value of param 
isUserClientId in method tryCleanQuotaNodes. I have fixed it. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12968: KAFKA-14417: Addressing the error server side

2022-12-08 Thread GitBox


jolshan commented on code in PR #12968:
URL: https://github.com/apache/kafka/pull/12968#discussion_r1043958866


##
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##
@@ -167,7 +167,7 @@ class RPCProducerIdManager(brokerId: Int,
   if (nextProducerId > currentProducerIdBlock.lastProducerId) {
 val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS)
 if (block == null) {
-  throw Errors.REQUEST_TIMED_OUT.exception("Timed out waiting for next 
producer ID block")

Review Comment:
   I'd assume its caught in the same place as the other error since they both 
end up getting thrown here.
   
   I wouldn't expect to see the request timed out get hit if the errors are 
caught.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] akhileshchg commented on a diff in pull request #12961: KAFKA-14446: API forwarding support from zkBrokers to the Controller

2022-12-08 Thread GitBox


akhileshchg commented on code in PR #12961:
URL: https://github.com/apache/kafka/pull/12961#discussion_r1042776473


##
clients/src/main/resources/common/message/EnvelopeRequest.json:
##
@@ -16,10 +16,11 @@
 {
   "apiKey": 58,
   "type": "request",
-  "listeners": ["controller"],
+  "listeners": ["controller", "zkBroker"],
   "name": "EnvelopeRequest",
   // Request struct for forwarding.
-  "validVersions": "0",
+  // Version 1 adds the envelope request support to ZK brokers as well.
+  "validVersions": "0-1",

Review Comment:
   That makes sense. Let me revert the version to 0.



##
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala:
##
@@ -37,42 +38,55 @@ import scala.collection.Seq
 import scala.compat.java8.OptionConverters._
 import scala.jdk.CollectionConverters._
 
+case class ControllerInformation(node: Option[Node],
+ listenerName: ListenerName,
+ securityProtocol: SecurityProtocol,
+ saslMechanism: String,
+ isZkController: Boolean)
+
 trait ControllerNodeProvider {
   def get(): Option[Node]
-  def listenerName: ListenerName
-  def securityProtocol: SecurityProtocol
-  def saslMechanism: String
-}
-
-object MetadataCacheControllerNodeProvider {
-  def apply(
-config: KafkaConfig,
-metadataCache: kafka.server.MetadataCache
-  ): MetadataCacheControllerNodeProvider = {
-val listenerName = config.controlPlaneListenerName
-  .getOrElse(config.interBrokerListenerName)
-
-val securityProtocol = config.controlPlaneSecurityProtocol
-  .getOrElse(config.interBrokerSecurityProtocol)
-
-new MetadataCacheControllerNodeProvider(
-  metadataCache,
-  listenerName,
-  securityProtocol,
-  config.saslMechanismInterBrokerProtocol
-)
-  }
+  def getControllerInfo(): ControllerInformation
 }
 
 class MetadataCacheControllerNodeProvider(
-  val metadataCache: kafka.server.MetadataCache,
-  val listenerName: ListenerName,
-  val securityProtocol: SecurityProtocol,
-  val saslMechanism: String
+  val metadataCache: ZkMetadataCache,
+  val config: KafkaConfig
 ) extends ControllerNodeProvider {
+
+  def listenerName(isZkController: Boolean): ListenerName = {

Review Comment:
   Whether this is part of the metadata cache or node provider, we still need 
to check for listername, security protocol and SASLMechanism needed to 
communicate with the controller based on if we're trying to connect to 
zkController or kraftController. I am not sure I completely understood your 
comment.



##
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala:
##
@@ -164,25 +184,63 @@ class BrokerToControllerChannelManagerImpl(
   private val logContext = new LogContext(s"[BrokerToControllerChannelManager 
broker=${config.brokerId} name=$channelName] ")
   private val manualMetadataUpdater = new ManualMetadataUpdater()
   private val apiVersions = new ApiVersions()
-  private val requestThread = newRequestThread
+  @volatile private var isZkControllerThread = true
+  @volatile private var requestThread = newRequestThread()
+  @volatile private var shutdownStarted = false
 
   def start(): Unit = {
 requestThread.start()
+maybeScheduleReinitializeRequestThread()
   }
 
   def shutdown(): Unit = {
 requestThread.shutdown()
+shutdownStarted = false
 info(s"Broker to controller channel manager for $channelName shutdown")
   }
 
-  private[server] def newRequestThread = {
+  def maybeScheduleReinitializeRequestThread(): Unit = {
+// If migration is enabled for zkBroker, then we might see controller 
change from zk to kraft
+// and vice-versa. This periodic task takes care of setting the right 
channel when such
+// controller change is noticed.
+if (config.migrationEnabled && config.requiresZookeeper) {

Review Comment:
   I tried to swap out just the networkclient in interBrokerSendThread. Let me 
know how it looks now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12968: KAFKA-14417: Addressing the error server side

2022-12-08 Thread GitBox


hachikuji commented on code in PR #12968:
URL: https://github.com/apache/kafka/pull/12968#discussion_r1043942073


##
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##
@@ -167,7 +167,7 @@ class RPCProducerIdManager(brokerId: Int,
   if (nextProducerId > currentProducerIdBlock.lastProducerId) {
 val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS)
 if (block == null) {
-  throw Errors.REQUEST_TIMED_OUT.exception("Timed out waiting for next 
producer ID block")

Review Comment:
   Where does this exception get caught? I wonder if it reaches the "unexpected 
error" handling in `KafkaApis`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14340) KIP-880: X509 SAN based SPIFFE URI ACL within mTLS Client Certificates

2022-12-08 Thread Bart Van Bos (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bart Van Bos updated KAFKA-14340:
-
Description: 
Istio and other *SPIFFE* based systems use X509 Client Certificates to provide 
workload ID. Kafka currently does support Client Cert based AuthN/Z and mapping 
to ACL, but only so be inspecting the CN field within a Client Certificate.
 * [https://spiffe.io/docs/latest/spiffe-about/spiffe-concepts/#spiffe-id] 

There are several POC implementations out there implementing a bespoke 
_KafkaPrincipalBuilder_ implementation for this purpose. Two examples include
 * [https://github.com/traiana/kafka-spiffe-principal]
 * [https://github.com/boeboe/kafka-istio-principal-builder] (written by myself)

The gist is to introspect X509 based client certificates, look for a URI based 
SPIFFE entry in the SAN extension and return that as a principle, that can be 
used to write ACL rules.

This KIP request is to include this functionality into Kafka's main 
functionality so end-users don't need to load custom and non-vetted java 
classes/implementations.

The main use case for me is having a lot of Istio customers that express the 
will to be able to leverage SPIFFE based IDs for their Kafka ACL Authorization. 
This eliminates the need for sidecars on the broker side or custom 
_EnvoyFilters_ and other less optimal implementations to integrate Kafka into 
an Istio secured Kubernetes environment. 
I believe this would make for a better integration between the Istio/SPIFFE and 
Kafka ecosystems.

 

  was:
Istio and other *SPIFFE* based systems use X509 Client Certificates to provide 
workload ID. Kafka currently does support Client Cert based AuthN/Z and mapping 
to ACL, but only so be inspecting the CN field within a Client Certificate.

There are several POC implementations out there implementing a bespoke 
_KafkaPrincipalBuilder_ implementation for this purpose. Two examples include
 * [https://github.com/traiana/kafka-spiffe-principal]
 * [https://github.com/boeboe/kafka-istio-principal-builder] (written by myself)

The gist is to introspect X509 based client certificates, look for a URI based 
SPIFFE entry in the SAN extension and return that as a principle, that can be 
used to write ACL rules.

This KIP request is to include this functionality into Kafka's main 
functionality so end-users don't need to load custom and non-vetted java 
classes/implementations.

The main use case for me is having a lot of Istio customers that express the 
will to be able to leverage SPIFFE based IDs for their Kafka ACL Authorization. 
This eliminates the need for sidecars on the broker side or custom 
_EnvoyFilters_ and other less optimal implementations to integrate Kafka into 
an Istio secured Kubernetes environment. 
I believe this would make for a better integration between the Istio/SPIFFE and 
Kafka ecosystems.

 


> KIP-880: X509 SAN based SPIFFE URI ACL within mTLS Client Certificates 
> ---
>
> Key: KAFKA-14340
> URL: https://issues.apache.org/jira/browse/KAFKA-14340
> Project: Kafka
>  Issue Type: Wish
>  Components: security
>Affects Versions: 3.3.1
>Reporter: Bart Van Bos
>Assignee: Bart Van Bos
>Priority: Minor
>
> Istio and other *SPIFFE* based systems use X509 Client Certificates to 
> provide workload ID. Kafka currently does support Client Cert based AuthN/Z 
> and mapping to ACL, but only so be inspecting the CN field within a Client 
> Certificate.
>  * [https://spiffe.io/docs/latest/spiffe-about/spiffe-concepts/#spiffe-id] 
> There are several POC implementations out there implementing a bespoke 
> _KafkaPrincipalBuilder_ implementation for this purpose. Two examples include
>  * [https://github.com/traiana/kafka-spiffe-principal]
>  * [https://github.com/boeboe/kafka-istio-principal-builder] (written by 
> myself)
> The gist is to introspect X509 based client certificates, look for a URI 
> based SPIFFE entry in the SAN extension and return that as a principle, that 
> can be used to write ACL rules.
> This KIP request is to include this functionality into Kafka's main 
> functionality so end-users don't need to load custom and non-vetted java 
> classes/implementations.
> The main use case for me is having a lot of Istio customers that express the 
> will to be able to leverage SPIFFE based IDs for their Kafka ACL 
> Authorization. This eliminates the need for sidecars on the broker side or 
> custom _EnvoyFilters_ and other less optimal implementations to integrate 
> Kafka into an Istio secured Kubernetes environment. 
> I believe this would make for a better integration between the Istio/SPIFFE 
> and Kafka ecosystems.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14340) KIP-880: X509 SAN based SPIFFE URI ACL within mTLS Client Certificates

2022-12-08 Thread Bart Van Bos (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bart Van Bos reassigned KAFKA-14340:


Assignee: Bart Van Bos

> KIP-880: X509 SAN based SPIFFE URI ACL within mTLS Client Certificates 
> ---
>
> Key: KAFKA-14340
> URL: https://issues.apache.org/jira/browse/KAFKA-14340
> Project: Kafka
>  Issue Type: Wish
>  Components: security
>Affects Versions: 3.3.1
>Reporter: Bart Van Bos
>Assignee: Bart Van Bos
>Priority: Minor
>
> Istio and other *SPIFFE* based systems use X509 Client Certificates to 
> provide workload ID. Kafka currently does support Client Cert based AuthN/Z 
> and mapping to ACL, but only so be inspecting the CN field within a Client 
> Certificate.
> There are several POC implementations out there implementing a bespoke 
> _KafkaPrincipalBuilder_ implementation for this purpose. Two examples include
>  * [https://github.com/traiana/kafka-spiffe-principal]
>  * [https://github.com/boeboe/kafka-istio-principal-builder] (written by 
> myself)
> The gist is to introspect X509 based client certificates, look for a URI 
> based SPIFFE entry in the SAN extension and return that as a principle, that 
> can be used to write ACL rules.
> This KIP request is to include this functionality into Kafka's main 
> functionality so end-users don't need to load custom and non-vetted java 
> classes/implementations.
> The main use case for me is having a lot of Istio customers that express the 
> will to be able to leverage SPIFFE based IDs for their Kafka ACL 
> Authorization. This eliminates the need for sidecars on the broker side or 
> custom _EnvoyFilters_ and other less optimal implementations to integrate 
> Kafka into an Istio secured Kubernetes environment. 
> I believe this would make for a better integration between the Istio/SPIFFE 
> and Kafka ecosystems.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] philipnee commented on a diff in pull request #12862: Consumer refator find coordinator

2022-12-08 Thread GitBox


philipnee commented on code in PR #12862:
URL: https://github.com/apache/kafka/pull/12862#discussion_r1043939575


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Handles the timing of the next FindCoordinatorRequest based on the {@link 
RequestState}. It checks for:
+ * 1. If there's an existing coordinator.
+ * 2. If there is an inflight request
+ * 3. If the backoff timer has expired
+ *
+ * The {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} 
contains either a wait
+ * timer, or a singleton list of
+ * {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}.
+ *
+ * The FindCoordinatorResponse will be handled by the {@link 
FindCoordinatorRequestHandler} callback, which
+ * subsequently invokes {@code onResponse} to handle the exceptions and 
responses. Note that, the coordinator node
+ * will be marked {@code null} upon receiving a failure.
+ */
+public class CoordinatorRequestManager implements RequestManager {
+
+private final Logger log;
+private final Time time;
+private final ErrorEventHandler errorHandler;
+private final long rebalanceTimeoutMs;
+private final String groupId;
+
+private final RequestState coordinatorRequestState;
+private long timeMarkedUnknownMs = -1L; // starting logging a warning only 
after unable to connect for a while
+private Node coordinator;
+
+
+public CoordinatorRequestManager(final Time time,
+ final LogContext logContext,
+ final ConsumerConfig config,
+ final ErrorEventHandler errorHandler,
+ final String groupId,
+ final long rebalanceTimeoutMs) {
+Objects.requireNonNull(groupId);
+this.time = time;
+this.log = logContext.logger(this.getClass());
+this.errorHandler = errorHandler;
+this.groupId = groupId;
+this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+this.coordinatorRequestState = new RequestState(config);
+}
+
+// Visible for testing
+CoordinatorRequestManager(final Time time,
+  final LogContext logContext,
+  final ErrorEventHandler errorHandler,
+  final String groupId,
+  final long rebalanceTimeoutMs,
+  final RequestState coordinatorRequestState) {
+Objects.requireNonNull(groupId);
+this.time = time;
+this.log = logContext.logger(this.getClass());
+this.errorHandler = errorHandler;
+this.groupId = groupId;
+this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+this.coordinatorRequestState = coordinatorRequestState;
+}
+
+@Override
+public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
+if (this.coordinator != null) {
+return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, 
Collections.emptyList());
+}
+
+if (coordinatorRequestState.canSendRequest(currentTimeMs)) {
+

[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator

2022-12-08 Thread GitBox


hachikuji commented on code in PR #12862:
URL: https://github.com/apache/kafka/pull/12862#discussion_r1043936750


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Handles the timing of the next FindCoordinatorRequest based on the {@link 
RequestState}. It checks for:
+ * 1. If there's an existing coordinator.
+ * 2. If there is an inflight request
+ * 3. If the backoff timer has expired
+ *
+ * The {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} 
contains either a wait
+ * timer, or a singleton list of
+ * {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}.
+ *
+ * The FindCoordinatorResponse will be handled by the {@link 
FindCoordinatorRequestHandler} callback, which
+ * subsequently invokes {@code onResponse} to handle the exceptions and 
responses. Note that, the coordinator node
+ * will be marked {@code null} upon receiving a failure.
+ */
+public class CoordinatorRequestManager implements RequestManager {
+
+private final Logger log;
+private final Time time;

Review Comment:
   Seems like we can get rid of this depenence?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+public class CoordinatorRequestManager implements RequestManager {
+
+private final Logger log;
+private final Time time;
+private final long requestTimeoutMs;
+ 

[GitHub] [kafka] cmccabe commented on a diff in pull request #12965: KAFKA-14448 Let ZK brokers register with KRaft controller

2022-12-08 Thread GitBox


cmccabe commented on code in PR #12965:
URL: https://github.com/apache/kafka/pull/12965#discussion_r1043911867


##
clients/src/main/resources/common/message/BrokerRegistrationRequest.json:
##
@@ -51,7 +51,7 @@
 },
 { "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": 
"0+",
   "about": "The rack which this broker is in." },
-{ "name": "IsMigratingZkBroker", "type": "int8", "versions": "0+", 
"taggedVersions": "0+", "tag": 0, "ignorable": true,
+{ "name": "IsMigratingZkBroker", "type": "bool", "versions": "0+", 
"taggedVersions": "0+", "tag": 0, "ignorable": true,

Review Comment:
   see my comment in the main PR conversation thread: this should not be tagged 
and we need a new version of the rpc ...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #12965: KAFKA-14448 Let ZK brokers register with KRaft controller

2022-12-08 Thread GitBox


cmccabe commented on code in PR #12965:
URL: https://github.com/apache/kafka/pull/12965#discussion_r1043911573


##
core/src/main/scala/kafka/server/KafkaServer.scala:
##
@@ -690,6 +772,14 @@ class KafkaServer(
 
   _brokerState = BrokerState.PENDING_CONTROLLED_SHUTDOWN
 
+  if (config.migrationEnabled && lifecycleManager != null) {
+// TODO KAFKA-14447 Only use KRaft controlled shutdown (when in 
migration mode)

Review Comment:
   so the TODO here is that we need to check if the controller is a kraft one?
   
   fair enough.
   
   I keep thinking that info should be in the metadata cache ... I guess we can 
do that in a follow-up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #12965: KAFKA-14448 Let ZK brokers register with KRaft controller

2022-12-08 Thread GitBox


cmccabe commented on code in PR #12965:
URL: https://github.com/apache/kafka/pull/12965#discussion_r1043911086


##
core/src/main/scala/kafka/server/KafkaServer.scala:
##
@@ -453,6 +534,16 @@ class KafkaServer(
 dynamicConfigManager = new ZkConfigManager(zkClient, 
dynamicConfigHandlers)
 dynamicConfigManager.startup()
 
+if (config.migrationEnabled && lifecycleManager != null) {
+  lifecycleManager.initialCatchUpFuture.whenComplete { case (_, t) =>
+if (t != null) {
+  fatal("Encountered an exception when waiting to catch up with 
KRaft metadata log", t)
+  shutdown()
+} else {
+  lifecycleManager.setReadyToUnfence()

Review Comment:
   can we have a log message here (INFO) ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #12965: KAFKA-14448 Let ZK brokers register with KRaft controller

2022-12-08 Thread GitBox


cmccabe commented on code in PR #12965:
URL: https://github.com/apache/kafka/pull/12965#discussion_r1043910049


##
core/src/main/scala/kafka/server/KafkaServer.scala:
##
@@ -81,6 +84,19 @@ object KafkaServer {
 clientConfig
   }
 
+  def zkClient(name: String, time: Time, config: KafkaConfig, zkClientConfig: 
ZKClientConfig): KafkaZkClient = {

Review Comment:
   Hmm. Maybe I'm missing something, but I don't see any relationship with 
`KafkaServer`. Why put this in `KafkaServer.scala`? Can this be a `static` 
function in `KafkaZkClient.scala`? (aka "function on `object KafkaZkClient`")?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #12965: KAFKA-14448 Let ZK brokers register with KRaft controller

2022-12-08 Thread GitBox


cmccabe commented on code in PR #12965:
URL: https://github.com/apache/kafka/pull/12965#discussion_r1043908893


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -2230,8 +2230,15 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
   validateAdvertisedListenersNonEmptyForBroker()
 } else {
   // ZK-based
-  // controller listener names must be empty when not in KRaft mode
-  require(controllerListenerNames.isEmpty, 
s"${KafkaConfig.ControllerListenerNamesProp} must be empty when not running in 
KRaft mode: ${controllerListenerNames.asJava}")
+  if (migrationEnabled) {
+validateNonEmptyQuorumVotersForKRaft()
+require(controllerListenerNames.nonEmpty,

Review Comment:
   this seems like a bug. in migration mode you want it to be non-empty, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #12965: KAFKA-14448 Let ZK brokers register with KRaft controller

2022-12-08 Thread GitBox


cmccabe commented on code in PR #12965:
URL: https://github.com/apache/kafka/pull/12965#discussion_r1043907559


##
core/src/main/scala/kafka/server/BrokerLifecycleManager.scala:
##
@@ -53,7 +53,8 @@ import scala.jdk.CollectionConverters._
  */
 class BrokerLifecycleManager(val config: KafkaConfig,

Review Comment:
   can you use the standard style of
   ```
   foo(
 bar
 baz
 quux
   ) {
   }
   ```
   
   this code was written before we adopted that but if we're adding new 
parameters let's format it that way to avoid having so much indentation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #12965: KAFKA-14448 Let ZK brokers register with KRaft controller

2022-12-08 Thread GitBox


cmccabe commented on PR #12965:
URL: https://github.com/apache/kafka/pull/12965#issuecomment-1343489745

   Thanks for the PR!
   
   > This patch uses the new isMigratingZkBroker field in 
BrokerRegistrationRequest and RegisterBrokerRecord. The type was changed from 
int8 to bool for BrokerRegistrationRequest (a mistake from 
https://github.com/apache/kafka/pull/12860).
   
   I think there is one other major mistake we made there: this should not be a 
tagged field, but should be a new version of the RPC. The reason is because 
this is not safe to ignore. If an old controller treats this broker like an 
ordinary KRaft broker, that would be a big mistake.
   
   While a new verison of `BrokerRegistrationRequest` may seem difficult to 
implement, I think it's actually simpler than it seems. The `NetworkClient` 
itself should have the `ApiVersions` of the controller available. It gets this 
when it first connects to the controller. So we can simply take the max of the 
registration version our broker supports, and the registration version the 
controller supports.
   
   There is already code to do all this in `BrokerRegistrationRequest#Builder`. 
The only complication is if you need to set `isZkBroker` you need to set 
`BrokerRegistrationRequest#Builder.oldestAllowedVersion` to 1 rather than 0.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] artemlivshits commented on a diff in pull request #12956: KAFKA-14379: consumer should refresh preferred read replica on update metadata

2022-12-08 Thread GitBox


artemlivshits commented on code in PR #12956:
URL: https://github.com/apache/kafka/pull/12956#discussion_r1043896712


##
clients/src/test/java/org/apache/kafka/common/requests/RequestTestUtils.java:
##
@@ -208,10 +209,10 @@ public static MetadataResponse metadataUpdateWith(final 
String clusterId,
 for (int i = 0; i < numPartitions; i++) {
 TopicPartition tp = new TopicPartition(topic, i);
 Node leader = nodes.get(i % nodes.size());
-List replicaIds = 
Collections.singletonList(leader.id());
+List replicaIds = 
nodes.stream().map(Node::id).collect(Collectors.toList());
 partitionMetadata.add(partitionSupplier.supply(
 Errors.NONE, tp, Optional.of(leader.id()), 
Optional.ofNullable(epochSupplier.apply(tp)),
-replicaIds, replicaIds, replicaIds));
+replicaIds, replicaIds, Collections.emptyList()));

Review Comment:
   Not sure about this change, looks like a re-base mistake.  Will take a 
closer look.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #12967: MINOR: Replace ArrayBuffer with ListBuffer for better performance

2022-12-08 Thread GitBox


ijuma commented on code in PR #12967:
URL: https://github.com/apache/kafka/pull/12967#discussion_r1043889697


##
core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala:
##
@@ -484,10 +484,7 @@ private[group] class GroupMetadata(val groupId: String, 
initialState: GroupState
* group does not know, because the information is not available yet or 
because the it has
* failed to parse the Consumer Protocol, it returns true to be safe.
*/
-  def isSubscribedToTopic(topic: String): Boolean = subscribedTopics match {
-case Some(topics) => topics.contains(topic)
-case None => true
-  }
+  def isSubscribedToTopic(topic: String): Boolean = 
subscribedTopics.forall(topics => topics.contains(topic))

Review Comment:
   How is this related to the PR description? Seems like an unrelated clean-up? 
Is your before/after test including these changes? Many of them _can_ result in 
additional allocations since you're replacing pattern matching with lambas.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #12967: MINOR: Replace ArrayBuffer with ListBuffer for better performance

2022-12-08 Thread GitBox


ijuma commented on PR #12967:
URL: https://github.com/apache/kafka/pull/12967#issuecomment-1343464276

   > The data structure is converted to Java's List from scala.
   
   Why is this? Java's `ArrayList` is array backed, just like `ArrayBuffer` 
while `ListBuffer` is a linked list.
   
   Generally, linked lists are worse due to cache effects and `ArrayBuffer` has 
amoritzed constant append too.
   
   One case where `ListBuffer` is better is when `ArrayBuffer` usage results in 
unnecessary allocations (i.e. small lists). A simple way of solving that for 
`ArrayBuffer` is to size the underlying array to be smaller.
   
   I haven't looked at the results in detail yet, but I wanted to share this as 
I am a bit confused about the PR description.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator

2022-12-08 Thread GitBox


hachikuji commented on code in PR #12862:
URL: https://github.com/apache/kafka/pull/12862#discussion_r1043805938


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestState.java:
##
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+
+// Visible for testing

Review Comment:
   We can get rid of these comments. A general-purpose javadoc would be good 
though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator

2022-12-08 Thread GitBox


hachikuji commented on code in PR #12862:
URL: https://github.com/apache/kafka/pull/12862#discussion_r1043805938


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestState.java:
##
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+
+// Visible for testing

Review Comment:
   We can get rid of these comments



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12855: KAFKA-14367; Add `DescribeGroups` to the new `GroupCoordinator` interface

2022-12-08 Thread GitBox


dajac commented on code in PR #12855:
URL: https://github.com/apache/kafka/pull/12855#discussion_r1043802283


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -1571,53 +1571,47 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
-  def handleDescribeGroupRequest(request: RequestChannel.Request): Unit = {
-
-def sendResponseCallback(describeGroupsResponseData: 
DescribeGroupsResponseData): Unit = {
-  def createResponse(requestThrottleMs: Int): AbstractResponse = {
-describeGroupsResponseData.setThrottleTimeMs(requestThrottleMs)
-new DescribeGroupsResponse(describeGroupsResponseData)
-  }
-  requestHelper.sendResponseMaybeThrottle(request, createResponse)
-}
-
+  def handleDescribeGroupsRequest(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
 val describeRequest = request.body[DescribeGroupsRequest]
-val describeGroupsResponseData = new DescribeGroupsResponseData()
+val includeAuthorizedOperations = 
describeRequest.data.includeAuthorizedOperations
+val futures = new 
mutable.ArrayBuffer[CompletableFuture[DescribeGroupsResponseData.DescribedGroup]](
+  describeRequest.data.groups.size
+)
 
 describeRequest.data.groups.forEach { groupId =>
   if (!authHelper.authorize(request.context, DESCRIBE, GROUP, groupId)) {
-
describeGroupsResponseData.groups.add(DescribeGroupsResponse.forError(groupId, 
Errors.GROUP_AUTHORIZATION_FAILED))
+futures += 
CompletableFuture.completedFuture(DescribeGroupsResponse.forError(
+  groupId,
+  Errors.GROUP_AUTHORIZATION_FAILED
+))
   } else {
-val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
-val members = summary.members.map { member =>
-  new DescribeGroupsResponseData.DescribedGroupMember()
-.setMemberId(member.memberId)
-.setGroupInstanceId(member.groupInstanceId.orNull)
-.setClientId(member.clientId)
-.setClientHost(member.clientHost)
-.setMemberAssignment(member.assignment)
-.setMemberMetadata(member.metadata)
-}
-
-val describedGroup = new DescribeGroupsResponseData.DescribedGroup()
-  .setErrorCode(error.code)
-  .setGroupId(groupId)
-  .setGroupState(summary.state)
-  .setProtocolType(summary.protocolType)
-  .setProtocolData(summary.protocol)
-  .setMembers(members.asJava)
-
-if (request.header.apiVersion >= 3) {
-  if (error == Errors.NONE && 
describeRequest.data.includeAuthorizedOperations) {
-
describedGroup.setAuthorizedOperations(authHelper.authorizedOperations(request, 
new Resource(ResourceType.GROUP, groupId)))
+futures += newGroupCoordinator.describeGroup(
+  request.context,
+  groupId
+).handle[DescribeGroupsResponseData.DescribedGroup] { (response, 
exception) =>
+  if (exception != null) {
+DescribeGroupsResponse.forError(groupId, 
Errors.forException(exception))

Review Comment:
   I did the refactor separately: https://github.com/apache/kafka/pull/12970.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac opened a new pull request, #12970: MINOR: Small refactor in DescribeGroupsResponse

2022-12-08 Thread GitBox


dajac opened a new pull request, #12970:
URL: https://github.com/apache/kafka/pull/12970

   This patch does a few cleanups:
   * It removes `DescribeGroupsResponse.fromError` and pushes its logic to 
`DescribeGroupsRequest.getErrorResponse` to be consistent with how we 
implemented the other requests/responses.
   * It renames `DescribedGroup.forError` to `DescribedGroup.groupError`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #12862: Consumer refator find coordinator

2022-12-08 Thread GitBox


philipnee commented on code in PR #12862:
URL: https://github.com/apache/kafka/pull/12862#discussion_r1043793970


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Handles the timing of the next FindCoordinatorRequest based on the {@link 
CoordinatorRequestState}. It checks for:
+ * 1. If there's an existing coordinator.
+ * 2. If there is an inflight request
+ * 3. If the backoff timer has expired
+ *
+ * The {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} 
contains either a wait
+ * timer, or a singleton list of
+ * {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}.
+ *
+ * The FindCoordinatorResponse will be handled by the {@link 
FindCoordinatorRequestHandler} callback, which
+ * subsequently invokes {@code onResponse} to handle the exceptions and 
responses. Note that, the coordinator node
+ * will be marked {@code null} upon receiving a failure.
+ */
+public class CoordinatorRequestManager implements RequestManager {
+
+private final Logger log;
+private final Time time;
+private final ErrorEventHandler errorHandler;
+private final long rebalanceTimeoutMs;
+private final String groupId;
+
+private final CoordinatorRequestState coordinatorRequestState;
+private long timeMarkedUnknownMs = -1L; // starting logging a warning only 
after unable to connect for a while
+private Node coordinator;
+
+
+public CoordinatorRequestManager(final Time time,
+ final LogContext logContext,
+ final ConsumerConfig config,
+ final ErrorEventHandler errorHandler,
+ final String groupId,
+ final long rebalanceTimeoutMs) {
+Objects.requireNonNull(groupId);
+this.time = time;
+this.log = logContext.logger(this.getClass());
+this.errorHandler = errorHandler;
+this.groupId = groupId;
+this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+this.coordinatorRequestState = new CoordinatorRequestState(config);
+}
+
+// Visible for testing
+CoordinatorRequestManager(final Time time,
+  final LogContext logContext,
+  final ErrorEventHandler errorHandler,
+  final String groupId,
+  final long rebalanceTimeoutMs,
+  final CoordinatorRequestState 
coordinatorRequestState) {
+Objects.requireNonNull(groupId);
+this.time = time;
+this.log = logContext.logger(this.getClass());
+this.errorHandler = errorHandler;
+this.groupId = groupId;
+this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+this.coordinatorRequestState = coordinatorRequestState;
+}
+
+@Override
+public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
+if (this.coordinator != null) {
+return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, 
Collections.emptyList());
+ 

[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator

2022-12-08 Thread GitBox


hachikuji commented on code in PR #12862:
URL: https://github.com/apache/kafka/pull/12862#discussion_r1043787431


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Handles the timing of the next FindCoordinatorRequest based on the {@link 
CoordinatorRequestState}. It checks for:
+ * 1. If there's an existing coordinator.
+ * 2. If there is an inflight request
+ * 3. If the backoff timer has expired
+ *
+ * The {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} 
contains either a wait
+ * timer, or a singleton list of
+ * {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}.
+ *
+ * The FindCoordinatorResponse will be handled by the {@link 
FindCoordinatorRequestHandler} callback, which
+ * subsequently invokes {@code onResponse} to handle the exceptions and 
responses. Note that, the coordinator node
+ * will be marked {@code null} upon receiving a failure.
+ */
+public class CoordinatorRequestManager implements RequestManager {
+
+private final Logger log;
+private final Time time;
+private final ErrorEventHandler errorHandler;
+private final long rebalanceTimeoutMs;
+private final String groupId;
+
+private final CoordinatorRequestState coordinatorRequestState;
+private long timeMarkedUnknownMs = -1L; // starting logging a warning only 
after unable to connect for a while
+private Node coordinator;
+
+
+public CoordinatorRequestManager(final Time time,
+ final LogContext logContext,
+ final ConsumerConfig config,
+ final ErrorEventHandler errorHandler,
+ final String groupId,
+ final long rebalanceTimeoutMs) {
+Objects.requireNonNull(groupId);
+this.time = time;
+this.log = logContext.logger(this.getClass());
+this.errorHandler = errorHandler;
+this.groupId = groupId;
+this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+this.coordinatorRequestState = new CoordinatorRequestState(config);
+}
+
+// Visible for testing
+CoordinatorRequestManager(final Time time,
+  final LogContext logContext,
+  final ErrorEventHandler errorHandler,
+  final String groupId,
+  final long rebalanceTimeoutMs,
+  final CoordinatorRequestState 
coordinatorRequestState) {
+Objects.requireNonNull(groupId);
+this.time = time;
+this.log = logContext.logger(this.getClass());
+this.errorHandler = errorHandler;
+this.groupId = groupId;
+this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+this.coordinatorRequestState = coordinatorRequestState;
+}
+
+@Override
+public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
+if (this.coordinator != null) {
+return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, 
Collections.emptyList());
+ 

[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator

2022-12-08 Thread GitBox


hachikuji commented on code in PR #12862:
URL: https://github.com/apache/kafka/pull/12862#discussion_r1043785411


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Handles the timing of the next FindCoordinatorRequest based on the {@link 
CoordinatorRequestState}. It checks for:
+ * 1. If there's an existing coordinator.
+ * 2. If there is an inflight request
+ * 3. If the backoff timer has expired
+ *
+ * The {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} 
contains either a wait
+ * timer, or a singleton list of
+ * {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}.
+ *
+ * The FindCoordinatorResponse will be handled by the {@link 
FindCoordinatorRequestHandler} callback, which
+ * subsequently invokes {@code onResponse} to handle the exceptions and 
responses. Note that, the coordinator node
+ * will be marked {@code null} upon receiving a failure.
+ */
+public class CoordinatorRequestManager implements RequestManager {
+
+private final Logger log;
+private final Time time;
+private final ErrorEventHandler errorHandler;
+private final long rebalanceTimeoutMs;
+private final String groupId;
+
+private final CoordinatorRequestState coordinatorRequestState;
+private long timeMarkedUnknownMs = -1L; // starting logging a warning only 
after unable to connect for a while
+private Node coordinator;
+
+
+public CoordinatorRequestManager(final Time time,
+ final LogContext logContext,
+ final ConsumerConfig config,
+ final ErrorEventHandler errorHandler,
+ final String groupId,
+ final long rebalanceTimeoutMs) {
+Objects.requireNonNull(groupId);
+this.time = time;
+this.log = logContext.logger(this.getClass());
+this.errorHandler = errorHandler;
+this.groupId = groupId;
+this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+this.coordinatorRequestState = new CoordinatorRequestState(config);
+}
+
+// Visible for testing
+CoordinatorRequestManager(final Time time,
+  final LogContext logContext,
+  final ErrorEventHandler errorHandler,
+  final String groupId,
+  final long rebalanceTimeoutMs,
+  final CoordinatorRequestState 
coordinatorRequestState) {
+Objects.requireNonNull(groupId);
+this.time = time;
+this.log = logContext.logger(this.getClass());
+this.errorHandler = errorHandler;
+this.groupId = groupId;
+this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+this.coordinatorRequestState = coordinatorRequestState;
+}
+
+@Override
+public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
+if (this.coordinator != null) {
+return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, 
Collections.emptyList());
+ 

[GitHub] [kafka] dajac commented on a diff in pull request #12855: KAFKA-14367; Add `DescribeGroups` to the new `GroupCoordinator` interface

2022-12-08 Thread GitBox


dajac commented on code in PR #12855:
URL: https://github.com/apache/kafka/pull/12855#discussion_r1043782746


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -1571,53 +1571,47 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
-  def handleDescribeGroupRequest(request: RequestChannel.Request): Unit = {
-
-def sendResponseCallback(describeGroupsResponseData: 
DescribeGroupsResponseData): Unit = {
-  def createResponse(requestThrottleMs: Int): AbstractResponse = {
-describeGroupsResponseData.setThrottleTimeMs(requestThrottleMs)
-new DescribeGroupsResponse(describeGroupsResponseData)
-  }
-  requestHelper.sendResponseMaybeThrottle(request, createResponse)
-}
-
+  def handleDescribeGroupsRequest(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
 val describeRequest = request.body[DescribeGroupsRequest]
-val describeGroupsResponseData = new DescribeGroupsResponseData()
+val includeAuthorizedOperations = 
describeRequest.data.includeAuthorizedOperations
+val futures = new 
mutable.ArrayBuffer[CompletableFuture[DescribeGroupsResponseData.DescribedGroup]](
+  describeRequest.data.groups.size
+)
 
 describeRequest.data.groups.forEach { groupId =>
   if (!authHelper.authorize(request.context, DESCRIBE, GROUP, groupId)) {
-
describeGroupsResponseData.groups.add(DescribeGroupsResponse.forError(groupId, 
Errors.GROUP_AUTHORIZATION_FAILED))
+futures += 
CompletableFuture.completedFuture(DescribeGroupsResponse.forError(
+  groupId,
+  Errors.GROUP_AUTHORIZATION_FAILED
+))
   } else {
-val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
-val members = summary.members.map { member =>
-  new DescribeGroupsResponseData.DescribedGroupMember()
-.setMemberId(member.memberId)
-.setGroupInstanceId(member.groupInstanceId.orNull)
-.setClientId(member.clientId)
-.setClientHost(member.clientHost)
-.setMemberAssignment(member.assignment)
-.setMemberMetadata(member.metadata)
-}
-
-val describedGroup = new DescribeGroupsResponseData.DescribedGroup()
-  .setErrorCode(error.code)
-  .setGroupId(groupId)
-  .setGroupState(summary.state)
-  .setProtocolType(summary.protocolType)
-  .setProtocolData(summary.protocol)
-  .setMembers(members.asJava)
-
-if (request.header.apiVersion >= 3) {
-  if (error == Errors.NONE && 
describeRequest.data.includeAuthorizedOperations) {
-
describedGroup.setAuthorizedOperations(authHelper.authorizedOperations(request, 
new Resource(ResourceType.GROUP, groupId)))
+futures += newGroupCoordinator.describeGroup(

Review Comment:
   Yeah, I think that I am with you on this point. Keeping `KafkaApis` simple 
makes sense. Let me give it a shot tomorrow.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12855: KAFKA-14367; Add `DescribeGroups` to the new `GroupCoordinator` interface

2022-12-08 Thread GitBox


hachikuji commented on code in PR #12855:
URL: https://github.com/apache/kafka/pull/12855#discussion_r1043778147


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -1571,53 +1571,47 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
-  def handleDescribeGroupRequest(request: RequestChannel.Request): Unit = {
-
-def sendResponseCallback(describeGroupsResponseData: 
DescribeGroupsResponseData): Unit = {
-  def createResponse(requestThrottleMs: Int): AbstractResponse = {
-describeGroupsResponseData.setThrottleTimeMs(requestThrottleMs)
-new DescribeGroupsResponse(describeGroupsResponseData)
-  }
-  requestHelper.sendResponseMaybeThrottle(request, createResponse)
-}
-
+  def handleDescribeGroupsRequest(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
 val describeRequest = request.body[DescribeGroupsRequest]
-val describeGroupsResponseData = new DescribeGroupsResponseData()
+val includeAuthorizedOperations = 
describeRequest.data.includeAuthorizedOperations
+val futures = new 
mutable.ArrayBuffer[CompletableFuture[DescribeGroupsResponseData.DescribedGroup]](
+  describeRequest.data.groups.size
+)
 
 describeRequest.data.groups.forEach { groupId =>
   if (!authHelper.authorize(request.context, DESCRIBE, GROUP, groupId)) {
-
describeGroupsResponseData.groups.add(DescribeGroupsResponse.forError(groupId, 
Errors.GROUP_AUTHORIZATION_FAILED))
+futures += 
CompletableFuture.completedFuture(DescribeGroupsResponse.forError(
+  groupId,
+  Errors.GROUP_AUTHORIZATION_FAILED
+))
   } else {
-val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
-val members = summary.members.map { member =>
-  new DescribeGroupsResponseData.DescribedGroupMember()
-.setMemberId(member.memberId)
-.setGroupInstanceId(member.groupInstanceId.orNull)
-.setClientId(member.clientId)
-.setClientHost(member.clientHost)
-.setMemberAssignment(member.assignment)
-.setMemberMetadata(member.metadata)
-}
-
-val describedGroup = new DescribeGroupsResponseData.DescribedGroup()
-  .setErrorCode(error.code)
-  .setGroupId(groupId)
-  .setGroupState(summary.state)
-  .setProtocolType(summary.protocolType)
-  .setProtocolData(summary.protocol)
-  .setMembers(members.asJava)
-
-if (request.header.apiVersion >= 3) {
-  if (error == Errors.NONE && 
describeRequest.data.includeAuthorizedOperations) {
-
describedGroup.setAuthorizedOperations(authHelper.authorizedOperations(request, 
new Resource(ResourceType.GROUP, groupId)))
+futures += newGroupCoordinator.describeGroup(

Review Comment:
   I don't have a strong opinion, but I slightly prefer keeping `KafkaApis` 
simple and having the complexity in the coordinator implementation. I also 
wonder if that makes optimization easier. For example, even if we have fanout 
to the respective coordinators, it may still be the case that multiple groups 
are handled by the same coordinator. Perhaps that would mean fewer messages on 
the respective queues? Not sure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac opened a new pull request, #12969: MINOR: Small refactor in ApiVersionManager

2022-12-08 Thread GitBox


dajac opened a new pull request, #12969:
URL: https://github.com/apache/kafka/pull/12969

   For KIP-848, I would like to check-in the new APIs but without exposing them 
while the KIP is in development. At first, my goal is to gate them by an 
internal configuration flag defined in KafkaConfig. Later one, this should be 
dynamic either based on MetadataVersion or another feature flag.
   
   This patch is a small refactor in `ApiVersionManager` to allow filtering 
APIs. The basic idea is to filter them based on KafkaConfig 
[here](https://github.com/apache/kafka/compare/trunk...dajac:kafka:minor-refacor-apiversion-manager?expand=1#diff-b89d6d5e31fa117d2dfb7d242c5035c18556c783f58360ca3b42801100640090R44)
 so instead of letting the  `ApiVersionManager` determine the available APIs 
based on the listener type, we give it the allowed set. This is definitely not 
the end state that I want but is a first step towards it. It will at least 
allow me to get this APIs checked in safely. We can come back to the dynamic 
part of it a bit later.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan opened a new pull request, #12968: KAFKA-14417:Addressing the error server side

2022-12-08 Thread GitBox


jolshan opened a new pull request, #12968:
URL: https://github.com/apache/kafka/pull/12968

   After some back and forth, it seems best to return an alternative error code 
old clients can handle.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12855: KAFKA-14367; Add `DescribeGroups` to the new `GroupCoordinator` interface

2022-12-08 Thread GitBox


hachikuji commented on code in PR #12855:
URL: https://github.com/apache/kafka/pull/12855#discussion_r1043769009


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -1571,53 +1571,47 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
-  def handleDescribeGroupRequest(request: RequestChannel.Request): Unit = {
-
-def sendResponseCallback(describeGroupsResponseData: 
DescribeGroupsResponseData): Unit = {
-  def createResponse(requestThrottleMs: Int): AbstractResponse = {
-describeGroupsResponseData.setThrottleTimeMs(requestThrottleMs)
-new DescribeGroupsResponse(describeGroupsResponseData)
-  }
-  requestHelper.sendResponseMaybeThrottle(request, createResponse)
-}
-
+  def handleDescribeGroupsRequest(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
 val describeRequest = request.body[DescribeGroupsRequest]
-val describeGroupsResponseData = new DescribeGroupsResponseData()
+val includeAuthorizedOperations = 
describeRequest.data.includeAuthorizedOperations
+val futures = new 
mutable.ArrayBuffer[CompletableFuture[DescribeGroupsResponseData.DescribedGroup]](
+  describeRequest.data.groups.size
+)
 
 describeRequest.data.groups.forEach { groupId =>
   if (!authHelper.authorize(request.context, DESCRIBE, GROUP, groupId)) {
-
describeGroupsResponseData.groups.add(DescribeGroupsResponse.forError(groupId, 
Errors.GROUP_AUTHORIZATION_FAILED))
+futures += 
CompletableFuture.completedFuture(DescribeGroupsResponse.forError(
+  groupId,
+  Errors.GROUP_AUTHORIZATION_FAILED
+))
   } else {
-val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
-val members = summary.members.map { member =>
-  new DescribeGroupsResponseData.DescribedGroupMember()
-.setMemberId(member.memberId)
-.setGroupInstanceId(member.groupInstanceId.orNull)
-.setClientId(member.clientId)
-.setClientHost(member.clientHost)
-.setMemberAssignment(member.assignment)
-.setMemberMetadata(member.metadata)
-}
-
-val describedGroup = new DescribeGroupsResponseData.DescribedGroup()
-  .setErrorCode(error.code)
-  .setGroupId(groupId)
-  .setGroupState(summary.state)
-  .setProtocolType(summary.protocolType)
-  .setProtocolData(summary.protocol)
-  .setMembers(members.asJava)
-
-if (request.header.apiVersion >= 3) {
-  if (error == Errors.NONE && 
describeRequest.data.includeAuthorizedOperations) {
-
describedGroup.setAuthorizedOperations(authHelper.authorizedOperations(request, 
new Resource(ResourceType.GROUP, groupId)))
+futures += newGroupCoordinator.describeGroup(
+  request.context,
+  groupId
+).handle[DescribeGroupsResponseData.DescribedGroup] { (response, 
exception) =>
+  if (exception != null) {
+DescribeGroupsResponse.forError(groupId, 
Errors.forException(exception))

Review Comment:
   Yeah, I was wondering why we didn't use `getErrorResponse`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on pull request #12913: KIP-866 Allow ZK brokers to register in KRaft

2022-12-08 Thread GitBox


mumrah commented on PR #12913:
URL: https://github.com/apache/kafka/pull/12913#issuecomment-1343283462

   @mimaison here's the new PR https://github.com/apache/kafka/pull/12965


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #12935: KAFKA-14432: RocksDBStore relies on finalizers to not leak memory

2022-12-08 Thread GitBox


cadonna commented on PR #12935:
URL: https://github.com/apache/kafka/pull/12935#issuecomment-1343280080

   Backported to 3.3, 3.2, 3.1, and 3.0.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac merged pull request #12932: KAFKA-14425; The Kafka protocol should support nullable structs

2022-12-08 Thread GitBox


dajac merged PR #12932:
URL: https://github.com/apache/kafka/pull/12932


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #12932: KAFKA-14425; The Kafka protocol should support nullable structs

2022-12-08 Thread GitBox


dajac commented on PR #12932:
URL: https://github.com/apache/kafka/pull/12932#issuecomment-1343231442

   The KIP has been accepted.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12855: KAFKA-14367; Add `DescribeGroups` to the new `GroupCoordinator` interface

2022-12-08 Thread GitBox


dajac commented on code in PR #12855:
URL: https://github.com/apache/kafka/pull/12855#discussion_r1043727578


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -1571,53 +1571,47 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
-  def handleDescribeGroupRequest(request: RequestChannel.Request): Unit = {
-
-def sendResponseCallback(describeGroupsResponseData: 
DescribeGroupsResponseData): Unit = {
-  def createResponse(requestThrottleMs: Int): AbstractResponse = {
-describeGroupsResponseData.setThrottleTimeMs(requestThrottleMs)
-new DescribeGroupsResponse(describeGroupsResponseData)
-  }
-  requestHelper.sendResponseMaybeThrottle(request, createResponse)
-}
-
+  def handleDescribeGroupsRequest(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
 val describeRequest = request.body[DescribeGroupsRequest]
-val describeGroupsResponseData = new DescribeGroupsResponseData()
+val includeAuthorizedOperations = 
describeRequest.data.includeAuthorizedOperations
+val futures = new 
mutable.ArrayBuffer[CompletableFuture[DescribeGroupsResponseData.DescribedGroup]](
+  describeRequest.data.groups.size
+)
 
 describeRequest.data.groups.forEach { groupId =>
   if (!authHelper.authorize(request.context, DESCRIBE, GROUP, groupId)) {
-
describeGroupsResponseData.groups.add(DescribeGroupsResponse.forError(groupId, 
Errors.GROUP_AUTHORIZATION_FAILED))
+futures += 
CompletableFuture.completedFuture(DescribeGroupsResponse.forError(
+  groupId,
+  Errors.GROUP_AUTHORIZATION_FAILED
+))
   } else {
-val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
-val members = summary.members.map { member =>
-  new DescribeGroupsResponseData.DescribedGroupMember()
-.setMemberId(member.memberId)
-.setGroupInstanceId(member.groupInstanceId.orNull)
-.setClientId(member.clientId)
-.setClientHost(member.clientHost)
-.setMemberAssignment(member.assignment)
-.setMemberMetadata(member.metadata)
-}
-
-val describedGroup = new DescribeGroupsResponseData.DescribedGroup()
-  .setErrorCode(error.code)
-  .setGroupId(groupId)
-  .setGroupState(summary.state)
-  .setProtocolType(summary.protocolType)
-  .setProtocolData(summary.protocol)
-  .setMembers(members.asJava)
-
-if (request.header.apiVersion >= 3) {
-  if (error == Errors.NONE && 
describeRequest.data.includeAuthorizedOperations) {
-
describedGroup.setAuthorizedOperations(authHelper.authorizedOperations(request, 
new Resource(ResourceType.GROUP, groupId)))
+futures += newGroupCoordinator.describeGroup(
+  request.context,
+  groupId
+).handle[DescribeGroupsResponseData.DescribedGroup] { (response, 
exception) =>
+  if (exception != null) {
+DescribeGroupsResponse.forError(groupId, 
Errors.forException(exception))

Review Comment:
   Sure. Let me clean this. I wonder if we should also remove `fromError` and 
push that code to `DescribeGroupsRequest.getErrorResponse` in order to be 
consistent with the other requests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12855: KAFKA-14367; Add `DescribeGroups` to the new `GroupCoordinator` interface

2022-12-08 Thread GitBox


dajac commented on code in PR #12855:
URL: https://github.com/apache/kafka/pull/12855#discussion_r1043725962


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -1571,53 +1571,47 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
-  def handleDescribeGroupRequest(request: RequestChannel.Request): Unit = {
-
-def sendResponseCallback(describeGroupsResponseData: 
DescribeGroupsResponseData): Unit = {
-  def createResponse(requestThrottleMs: Int): AbstractResponse = {
-describeGroupsResponseData.setThrottleTimeMs(requestThrottleMs)
-new DescribeGroupsResponse(describeGroupsResponseData)
-  }
-  requestHelper.sendResponseMaybeThrottle(request, createResponse)
-}
-
+  def handleDescribeGroupsRequest(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
 val describeRequest = request.body[DescribeGroupsRequest]
-val describeGroupsResponseData = new DescribeGroupsResponseData()
+val includeAuthorizedOperations = 
describeRequest.data.includeAuthorizedOperations
+val futures = new 
mutable.ArrayBuffer[CompletableFuture[DescribeGroupsResponseData.DescribedGroup]](
+  describeRequest.data.groups.size
+)
 
 describeRequest.data.groups.forEach { groupId =>
   if (!authHelper.authorize(request.context, DESCRIBE, GROUP, groupId)) {
-
describeGroupsResponseData.groups.add(DescribeGroupsResponse.forError(groupId, 
Errors.GROUP_AUTHORIZATION_FAILED))
+futures += 
CompletableFuture.completedFuture(DescribeGroupsResponse.forError(
+  groupId,
+  Errors.GROUP_AUTHORIZATION_FAILED
+))
   } else {
-val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
-val members = summary.members.map { member =>
-  new DescribeGroupsResponseData.DescribedGroupMember()
-.setMemberId(member.memberId)
-.setGroupInstanceId(member.groupInstanceId.orNull)
-.setClientId(member.clientId)
-.setClientHost(member.clientHost)
-.setMemberAssignment(member.assignment)
-.setMemberMetadata(member.metadata)
-}
-
-val describedGroup = new DescribeGroupsResponseData.DescribedGroup()
-  .setErrorCode(error.code)
-  .setGroupId(groupId)
-  .setGroupState(summary.state)
-  .setProtocolType(summary.protocolType)
-  .setProtocolData(summary.protocol)
-  .setMembers(members.asJava)
-
-if (request.header.apiVersion >= 3) {
-  if (error == Errors.NONE && 
describeRequest.data.includeAuthorizedOperations) {
-
describedGroup.setAuthorizedOperations(authHelper.authorizedOperations(request, 
new Resource(ResourceType.GROUP, groupId)))
+futures += newGroupCoordinator.describeGroup(

Review Comment:
   There is not particular reason, I guess. @hachikuji The reason I kept it 
like this is that we will need to fan out any way with the new controller as we 
will have multiple event loops. I suppose that we could say that this is an 
internal implementation details so not exposing it in the interface would make 
sense. What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12855: KAFKA-14367; Add `DescribeGroups` to the new `GroupCoordinator` interface

2022-12-08 Thread GitBox


jolshan commented on code in PR #12855:
URL: https://github.com/apache/kafka/pull/12855#discussion_r1043708573


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -1571,53 +1571,47 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
-  def handleDescribeGroupRequest(request: RequestChannel.Request): Unit = {
-
-def sendResponseCallback(describeGroupsResponseData: 
DescribeGroupsResponseData): Unit = {
-  def createResponse(requestThrottleMs: Int): AbstractResponse = {
-describeGroupsResponseData.setThrottleTimeMs(requestThrottleMs)
-new DescribeGroupsResponse(describeGroupsResponseData)
-  }
-  requestHelper.sendResponseMaybeThrottle(request, createResponse)
-}
-
+  def handleDescribeGroupsRequest(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
 val describeRequest = request.body[DescribeGroupsRequest]
-val describeGroupsResponseData = new DescribeGroupsResponseData()
+val includeAuthorizedOperations = 
describeRequest.data.includeAuthorizedOperations
+val futures = new 
mutable.ArrayBuffer[CompletableFuture[DescribeGroupsResponseData.DescribedGroup]](
+  describeRequest.data.groups.size
+)
 
 describeRequest.data.groups.forEach { groupId =>
   if (!authHelper.authorize(request.context, DESCRIBE, GROUP, groupId)) {
-
describeGroupsResponseData.groups.add(DescribeGroupsResponse.forError(groupId, 
Errors.GROUP_AUTHORIZATION_FAILED))
+futures += 
CompletableFuture.completedFuture(DescribeGroupsResponse.forError(
+  groupId,
+  Errors.GROUP_AUTHORIZATION_FAILED
+))
   } else {
-val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
-val members = summary.members.map { member =>
-  new DescribeGroupsResponseData.DescribedGroupMember()
-.setMemberId(member.memberId)
-.setGroupInstanceId(member.groupInstanceId.orNull)
-.setClientId(member.clientId)
-.setClientHost(member.clientHost)
-.setMemberAssignment(member.assignment)
-.setMemberMetadata(member.metadata)
-}
-
-val describedGroup = new DescribeGroupsResponseData.DescribedGroup()
-  .setErrorCode(error.code)
-  .setGroupId(groupId)
-  .setGroupState(summary.state)
-  .setProtocolType(summary.protocolType)
-  .setProtocolData(summary.protocol)
-  .setMembers(members.asJava)
-
-if (request.header.apiVersion >= 3) {
-  if (error == Errors.NONE && 
describeRequest.data.includeAuthorizedOperations) {
-
describedGroup.setAuthorizedOperations(authHelper.authorizedOperations(request, 
new Resource(ResourceType.GROUP, groupId)))
+futures += newGroupCoordinator.describeGroup(

Review Comment:
   Any idea why the original semantics were to handle each group one by one?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14417) Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error

2022-12-08 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-14417.
-
Fix Version/s: 4.0.0
   3.3.2
   Resolution: Fixed

> Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats 
> as fatal error
> --
>
> Key: KAFKA-14417
> URL: https://issues.apache.org/jira/browse/KAFKA-14417
> Project: Kafka
>  Issue Type: Task
>Affects Versions: 3.1.0, 3.0.0, 3.2.0, 3.3.0, 3.4.0
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Blocker
> Fix For: 4.0.0, 3.3.2
>
>
> In TransactionManager we have a handler for InitProducerIdRequests 
> [https://github.com/apache/kafka/blob/19286449ee20f85cc81860e13df14467d4ce287c/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#LL1276C14-L1276C14]
> However, we have the potential to return a REQUEST_TIMED_OUT error in 
> RPCProducerIdManager when the BrokerToControllerChannel manager times out: 
> [https://github.com/apache/kafka/blob/19286449ee20f85cc81860e13df14467d4ce287c/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala#L236]
>  
> or when the poll returns null: 
> [https://github.com/apache/kafka/blob/19286449ee20f85cc81860e13df14467d4ce287c/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala#L170]
> Since REQUEST_TIMED_OUT is not handled by the producer, we treat it as a 
> fatal error and the producer fails. With the default of idempotent producers, 
> this can cause more issues.
> See this stack trace from 3.0:
> {code:java}
> ERROR [Producer clientId=console-producer] Aborting producer batches due to 
> fatal error (org.apache.kafka.clients.producer.internals.Sender)
> org.apache.kafka.common.KafkaException: Unexpected error in 
> InitProducerIdResponse; The request timed out.
>   at 
> org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1390)
>   at 
> org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1294)
>   at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
>   at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:658)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:650)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:418)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:316)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:256)
>   at java.lang.Thread.run(Thread.java:748)
> {code}
> Seems like the commit that introduced the changes was this one: 
> [https://github.com/apache/kafka/commit/72d108274c98dca44514007254552481c731c958]
>  so we are vulnerable when the server code is ibp 3.0 and beyond.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] hachikuji commented on a diff in pull request #12855: KAFKA-14367; Add `DescribeGroups` to the new `GroupCoordinator` interface

2022-12-08 Thread GitBox


hachikuji commented on code in PR #12855:
URL: https://github.com/apache/kafka/pull/12855#discussion_r1043700608


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -1571,53 +1571,47 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
-  def handleDescribeGroupRequest(request: RequestChannel.Request): Unit = {
-
-def sendResponseCallback(describeGroupsResponseData: 
DescribeGroupsResponseData): Unit = {
-  def createResponse(requestThrottleMs: Int): AbstractResponse = {
-describeGroupsResponseData.setThrottleTimeMs(requestThrottleMs)
-new DescribeGroupsResponse(describeGroupsResponseData)
-  }
-  requestHelper.sendResponseMaybeThrottle(request, createResponse)
-}
-
+  def handleDescribeGroupsRequest(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
 val describeRequest = request.body[DescribeGroupsRequest]
-val describeGroupsResponseData = new DescribeGroupsResponseData()
+val includeAuthorizedOperations = 
describeRequest.data.includeAuthorizedOperations
+val futures = new 
mutable.ArrayBuffer[CompletableFuture[DescribeGroupsResponseData.DescribedGroup]](
+  describeRequest.data.groups.size
+)
 
 describeRequest.data.groups.forEach { groupId =>
   if (!authHelper.authorize(request.context, DESCRIBE, GROUP, groupId)) {
-
describeGroupsResponseData.groups.add(DescribeGroupsResponse.forError(groupId, 
Errors.GROUP_AUTHORIZATION_FAILED))
+futures += 
CompletableFuture.completedFuture(DescribeGroupsResponse.forError(
+  groupId,
+  Errors.GROUP_AUTHORIZATION_FAILED
+))
   } else {
-val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
-val members = summary.members.map { member =>
-  new DescribeGroupsResponseData.DescribedGroupMember()
-.setMemberId(member.memberId)
-.setGroupInstanceId(member.groupInstanceId.orNull)
-.setClientId(member.clientId)
-.setClientHost(member.clientHost)
-.setMemberAssignment(member.assignment)
-.setMemberMetadata(member.metadata)
-}
-
-val describedGroup = new DescribeGroupsResponseData.DescribedGroup()
-  .setErrorCode(error.code)
-  .setGroupId(groupId)
-  .setGroupState(summary.state)
-  .setProtocolType(summary.protocolType)
-  .setProtocolData(summary.protocol)
-  .setMembers(members.asJava)
-
-if (request.header.apiVersion >= 3) {
-  if (error == Errors.NONE && 
describeRequest.data.includeAuthorizedOperations) {
-
describedGroup.setAuthorizedOperations(authHelper.authorizedOperations(request, 
new Resource(ResourceType.GROUP, groupId)))
+futures += newGroupCoordinator.describeGroup(

Review Comment:
   I wonder if it would be simpler to pass through all groups that we want to 
describe. Then we just have a single future.



##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -1571,53 +1571,47 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
-  def handleDescribeGroupRequest(request: RequestChannel.Request): Unit = {
-
-def sendResponseCallback(describeGroupsResponseData: 
DescribeGroupsResponseData): Unit = {
-  def createResponse(requestThrottleMs: Int): AbstractResponse = {
-describeGroupsResponseData.setThrottleTimeMs(requestThrottleMs)
-new DescribeGroupsResponse(describeGroupsResponseData)
-  }
-  requestHelper.sendResponseMaybeThrottle(request, createResponse)
-}
-
+  def handleDescribeGroupsRequest(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
 val describeRequest = request.body[DescribeGroupsRequest]
-val describeGroupsResponseData = new DescribeGroupsResponseData()
+val includeAuthorizedOperations = 
describeRequest.data.includeAuthorizedOperations
+val futures = new 
mutable.ArrayBuffer[CompletableFuture[DescribeGroupsResponseData.DescribedGroup]](
+  describeRequest.data.groups.size
+)
 
 describeRequest.data.groups.forEach { groupId =>
   if (!authHelper.authorize(request.context, DESCRIBE, GROUP, groupId)) {
-
describeGroupsResponseData.groups.add(DescribeGroupsResponse.forError(groupId, 
Errors.GROUP_AUTHORIZATION_FAILED))
+futures += 
CompletableFuture.completedFuture(DescribeGroupsResponse.forError(
+  groupId,
+  Errors.GROUP_AUTHORIZATION_FAILED
+))
   } else {
-val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
-val members = summary.members.map { member =>
-  new DescribeGroupsResponseData.DescribedGroupMember()
-.setMemberId(member.memberId)
-.setGroupInstanceId(member.groupInstanceId.orNull)
-.setClientId(member.clientId)
-.setClientHost(member.clientHost)
-.setMemberAssignment(member.assignment)
-

[GitHub] [kafka] mumrah commented on a diff in pull request #12965: KAFKA-14448 Let ZK brokers register with KRaft controller

2022-12-08 Thread GitBox


mumrah commented on code in PR #12965:
URL: https://github.com/apache/kafka/pull/12965#discussion_r1043473436


##
core/src/main/scala/kafka/server/KafkaServer.scala:
##
@@ -81,6 +84,19 @@ object KafkaServer {
 clientConfig
   }
 
+  def zkClient(name: String, time: Time, config: KafkaConfig, zkClientConfig: 
ZKClientConfig): KafkaZkClient = {

Review Comment:
   This was meant to go in with #12946. It's a small refactor to make it easy 
to create a KafkaZkClient from ControllerServer (which is TBD)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12963: MINOR: More consistent handling of snapshot IDs

2022-12-08 Thread GitBox


hachikuji commented on code in PR #12963:
URL: https://github.com/apache/kafka/pull/12963#discussion_r1043680891


##
core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala:
##
@@ -166,15 +172,15 @@ class BrokerMetadataListenerTest {
   image = newImage
 }
 
-override def publishedOffset: Long = -1
+override def publishedEndOffset: Long = -1
   }
 
   private val FOO_ID = Uuid.fromString("jj1G9utnTuCegi_gpnRgYw")
   private val BAR_ID = Uuid.fromString("SzN5j0LvSEaRIJHrxfMAlg")
 
   private def generateManyRecords(listener: BrokerMetadataListener,
   endOffset: Long): Unit = {
-(0 to 1).foreach { _ =>
+(0 until 1).foreach { _ =>
   listener.handleCommit(
 RecordTestUtils.mockBatchReader(
   endOffset,

Review Comment:
   Are you referring to the removing replicas changes here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12963: MINOR: More consistent handling of snapshot IDs

2022-12-08 Thread GitBox


hachikuji commented on code in PR #12963:
URL: https://github.com/apache/kafka/pull/12963#discussion_r1043679688


##
core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala:
##
@@ -166,15 +172,15 @@ class BrokerMetadataListenerTest {
   image = newImage
 }
 
-override def publishedOffset: Long = -1
+override def publishedEndOffset: Long = -1
   }
 
   private val FOO_ID = Uuid.fromString("jj1G9utnTuCegi_gpnRgYw")
   private val BAR_ID = Uuid.fromString("SzN5j0LvSEaRIJHrxfMAlg")
 
   private def generateManyRecords(listener: BrokerMetadataListener,
   endOffset: Long): Unit = {

Review Comment:
   I changed the method here to make `endOffset` exclusive, so it seems ok now? 
Or am I missing something?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah merged pull request #12946: KAFKA-14427 ZK client support for migrations

2022-12-08 Thread GitBox


mumrah merged PR #12946:
URL: https://github.com/apache/kafka/pull/12946


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #12964: MINOR: Introduce MetadataProvenance and ImageReWriter

2022-12-08 Thread GitBox


cmccabe commented on code in PR #12964:
URL: https://github.com/apache/kafka/pull/12964#discussion_r1043673991


##
metadata/src/main/java/org/apache/kafka/image/MetadataProvenance.java:
##
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+import java.util.Objects;
+
+
+/**
+ * Information about the source of a metadata image.
+ */
+public final class MetadataProvenance {
+public static final MetadataProvenance EMPTY = new MetadataProvenance(-1L, 
0, 0L);
+
+private final long offset;
+private final int epoch;
+private final long lastContainedLogTimeMs;
+
+public MetadataProvenance(
+long offset,
+int epoch,
+long lastContainedLogTimeMs
+) {
+this.offset = offset;
+this.epoch = epoch;
+this.lastContainedLogTimeMs = lastContainedLogTimeMs;
+}
+
+public OffsetAndEpoch offsetAndEpoch() {
+return new OffsetAndEpoch(offset, epoch);
+}
+
+public long offset() {
+return offset;
+}
+
+public int epoch() {
+return epoch;
+}
+
+public long lastContainedLogTimeMs() {
+return lastContainedLogTimeMs;
+}
+
+public String snapshotName() {

Review Comment:
   It is used in the new metadata loader, which is not part of this PR. It is 
the name that a snapshot with the given provenance (offset, epoch) would have. 
I will add a Javadoc comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on pull request #12946: KAFKA-14427 ZK client support for migrations

2022-12-08 Thread GitBox


mumrah commented on PR #12946:
URL: https://github.com/apache/kafka/pull/12946#issuecomment-1343113991

   Latest build looks good. A few unrelated flaky tests:
   
   ```
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.clients.consumer.CooperativeStickyAssignorTest.testLargeAssignmentAndGroupWithNonEqualSubscription()
 | 1 min 2 sec | 1
   Build / JDK 8 and Scala 2.12 / 
kafka.api.TransactionsExpirationTest.testTransactionAfterProducerIdExpires(String).quorum=kraft
 | 8.8 sec | 1
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.streams.integration.SmokeTestDriverIntegrationTest.[1] true
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #12964: MINOR: Introduce MetadataProvenance and ImageReWriter

2022-12-08 Thread GitBox


cmccabe commented on code in PR #12964:
URL: https://github.com/apache/kafka/pull/12964#discussion_r1043672488


##
metadata/src/main/java/org/apache/kafka/image/MetadataProvenance.java:
##
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+import java.util.Objects;
+
+
+/**
+ * Information about the source of a metadata image.
+ */
+public final class MetadataProvenance {
+public static final MetadataProvenance EMPTY = new MetadataProvenance(-1L, 
0, 0L);

Review Comment:
   yes, that's fair. I suppose they should all be -1 because 0 implies that you 
have read and seen offset / epoch / timestamp 0.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #12964: MINOR: Introduce MetadataProvenance and ImageReWriter

2022-12-08 Thread GitBox


cmccabe commented on code in PR #12964:
URL: https://github.com/apache/kafka/pull/12964#discussion_r1043668840


##
metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java:
##
@@ -152,19 +153,7 @@ public Optional metadataVersionChanged() {
 }
 }
 
-public void read(long highestOffset, int highestEpoch, 
Iterator> reader) {
-while (reader.hasNext()) {
-List batch = reader.next();
-for (ApiMessageAndVersion messageAndVersion : batch) {
-replay(highestOffset, highestEpoch, 
messageAndVersion.message());
-}
-}
-}
-
-public void replay(long offset, int epoch, ApiMessage record) {
-highestOffset = offset;
-highestEpoch = epoch;
-
+public void replay(ApiMessage record) {

Review Comment:
   Offset and epoch are a bit silly in cases like when loading a snapshot, 
where all records have the same offset and epoch.
   
   This information is better tracked by the metadata loader and if we need it, 
we always have it there. We also have other information such as the source of 
the records and so on, in the loader.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya opened a new pull request, #12967: MINOR: Replace ArrayBuffer with ListBuffer for better performance

2022-12-08 Thread GitBox


divijvaidya opened a new pull request, #12967:
URL: https://github.com/apache/kafka/pull/12967

   `ListBuffer` should be preferred over `ArrayBuffer` in cases where:
   1. No indexing is required.
   2. The data structure is converted to Java's List from scala.
   
   `ListBuffer` has a [guaranteed constant time 
performance](https://docs.scala-lang.org/overviews/collections-2.13/performance-characteristics.html)
 for append operations which makes it a lucrative data structure for  cases 
when we create a list and append multiple entries to it.
   
   This PR modifies usage of `ArrayBuffer` to `ListBuffer` in latency sensitive 
code path. To prove the usefulness of this change, I am attaching a CPU profile 
for an [open messaging 
benchmark](https://openmessaging.cloud/docs/benchmarks/). Please note that the 
self-time of the `ReplicaManager.fetchMessages()` reduced after this changes 
(depicted by the empty space in top of the frame for 
`ReplicaManager.fetchMessages()`.
   
   **Before this PR**

   ![Screenshot 2022-12-08 at 15 30 
24](https://user-images.githubusercontent.com/71267/206517187-6ebbcad5-00b5-4706-a2b8-5c64dce71d9a.png)
   
   **After this PR**
   ![Screenshot 2022-12-08 at 15 30 
37](https://user-images.githubusercontent.com/71267/206517243-97938193-449d-4e46-9c32-fb9bc82c0469.png)
   
   
   Note: This PR piggybacks some general scala cleanup regarding usage of 
Optionals.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14340) KIP-880: X509 SAN based SPIFFE URI ACL within mTLS Client Certificates

2022-12-08 Thread Bart Van Bos (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17644883#comment-17644883
 ] 

Bart Van Bos commented on KAFKA-14340:
--

Duplicate of https://issues.apache.org/jira/browse/KAFKA-12807

> KIP-880: X509 SAN based SPIFFE URI ACL within mTLS Client Certificates 
> ---
>
> Key: KAFKA-14340
> URL: https://issues.apache.org/jira/browse/KAFKA-14340
> Project: Kafka
>  Issue Type: Wish
>  Components: security
>Affects Versions: 3.3.1
>Reporter: Bart Van Bos
>Priority: Minor
>
> Istio and other *SPIFFE* based systems use X509 Client Certificates to 
> provide workload ID. Kafka currently does support Client Cert based AuthN/Z 
> and mapping to ACL, but only so be inspecting the CN field within a Client 
> Certificate.
> There are several POC implementations out there implementing a bespoke 
> _KafkaPrincipalBuilder_ implementation for this purpose. Two examples include
>  * [https://github.com/traiana/kafka-spiffe-principal]
>  * [https://github.com/boeboe/kafka-istio-principal-builder] (written by 
> myself)
> The gist is to introspect X509 based client certificates, look for a URI 
> based SPIFFE entry in the SAN extension and return that as a principle, that 
> can be used to write ACL rules.
> This KIP request is to include this functionality into Kafka's main 
> functionality so end-users don't need to load custom and non-vetted java 
> classes/implementations.
> The main use case for me is having a lot of Istio customers that express the 
> will to be able to leverage SPIFFE based IDs for their Kafka ACL 
> Authorization. This eliminates the need for sidecars on the broker side or 
> custom _EnvoyFilters_ and other less optimal implementations to integrate 
> Kafka into an Istio secured Kubernetes environment. 
> I believe this would make for a better integration between the Istio/SPIFFE 
> and Kafka ecosystems.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14352) Support rack-aware partition assignment for Kafka consumers

2022-12-08 Thread Rajini Sivaram (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14352?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-14352.

  Reviewer: David Jacot
Resolution: Fixed

This includes protocol changes for rack-aware assignment. Default assignors 
will be made rack-aware under follow-on tickets in the next release.

> Support rack-aware partition assignment for Kafka consumers
> ---
>
> Key: KAFKA-14352
> URL: https://issues.apache.org/jira/browse/KAFKA-14352
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 3.4.0
>
>
> KIP-392 added support for consumers to fetch from the replica in their local 
> rack. To benefit from locality, consumers need to be assigned partitions 
> which have a replica in the same rack. This works well when replication 
> factor is the same as the number of racks, since every rack would then have a 
> replica with rack-aware replica assignment. If the number of racks is higher, 
> some racks may not have replicas of some partitions and hence consumers in 
> these racks will have to fetch from another rack. It will be useful to 
> propagate rack in the subscription metadata for consumers and provide a 
> rack-aware partition assignor for consumers.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cadonna commented on a diff in pull request #12959: MINOR: Fix various memory leaks in tests

2022-12-08 Thread GitBox


cadonna commented on code in PR #12959:
URL: https://github.com/apache/kafka/pull/12959#discussion_r1043244790


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplierTest.java:
##
@@ -130,6 +134,8 @@ public void shouldDeleteKeyAndPropagateV0() {
   .withValue(new Change<>(newValue, oldValue)),
 forwarded.get(0).record()
 );
+
+stateStore.close();

Review Comment:
   Why do you close the state store in the test here but not in the other tests?



##
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java:
##
@@ -148,6 +148,11 @@ private void verifyDBOptionsMethodCall(final Method 
method) throws Exception {
 assertThat(undeclaredMockMethodCall.getCause(), 
instanceOf(AssertionError.class));
 assertThat(undeclaredMockMethodCall.getCause().getMessage().trim(),
 matchesPattern("Unexpected method call DBOptions\\." + 
method.getName() + "((.*\n*)*):"));
+} finally {
+reset(mockedDbOptions);
+mockedDbOptions.close();
+replay(mockedDbOptions);
+optionsFacadeDbOptions.close();

Review Comment:
   I think it would be enough to just `optionsFacadeDbOptions.close();`. You 
already verify that `optionsFacadeDbOptions.close();` calls 
`mockedDbOptions.close();` in the try-block. No need to verify it again.
   Alternatively, you could add `close` to the list of ignored methods 
(`ignoreMethods`) and verify as you did. However, you need to add 
`verify(mockedDbOptions)` after `optionsFacadeDbOptions.close()` otherwise 
nothing is verified. 



##
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java:
##
@@ -333,23 +343,23 @@ public void shouldLogWarningWhenSettingWalOptions() 
throws Exception {
 
 try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.class))
 {
 
-final RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter 
adapter
-= new 
RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter(new DBOptions(), new 
ColumnFamilyOptions());
-
-for (final Method method : 
RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.class.getDeclaredMethods())
 {
-if (walRelatedMethods.contains(method.getName())) {
-method.invoke(adapter, 
getDBOptionsParameters(method.getParameterTypes()));
+try (RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter 
adapter =
+ new 
RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter(new DBOptions(), new 
ColumnFamilyOptions())) {
+for (final Method method : 
RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.class.getDeclaredMethods())
 {
+if (walRelatedMethods.contains(method.getName())) {
+method.invoke(adapter, 
getDBOptionsParameters(method.getParameterTypes()));
+}
 }
-}
 
-final List walOptions = Arrays.asList("walDir", 
"walFilter", "walRecoveryMode", "walBytesPerSync", "walSizeLimitMB", 
"manualWalFlush", "maxTotalWalSize", "walTtlSeconds");
+final List walOptions = Arrays.asList("walDir", 
"walFilter", "walRecoveryMode", "walBytesPerSync", "walSizeLimitMB", 
"manualWalFlush", "maxTotalWalSize", "walTtlSeconds");
 
-final Set logMessages = appender.getEvents().stream()
-.filter(e -> e.getLevel().equals("WARN"))
-.map(LogCaptureAppender.Event::getMessage)
-.collect(Collectors.toSet());
+final Set logMessages = appender.getEvents().stream()
+.filter(e -> e.getLevel().equals("WARN"))
+.map(LogCaptureAppender.Event::getMessage)
+.collect(Collectors.toSet());
 
-walOptions.forEach(option -> assertThat(logMessages, 
hasItem(String.format("WAL is explicitly disabled by Streams in RocksDB. 
Setting option '%s' will be ignored", option;
+walOptions.forEach(option -> assertThat(logMessages, 
hasItem(String.format("WAL is explicitly disabled by Streams in RocksDB. 
Setting option '%s' will be ignored", option;
+}
 

Review Comment:
   ```suggestion
   ```



##
streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java:
##
@@ -480,6 +480,7 @@ public void shouldCreateWriteBatches() {
 assertEquals(2, writeBatchMap.size());
 for (final WriteBatch batch : writeBatchMap.values()) {
 assertEquals(1, batch.count());
+batch.close();

Review Comment:
   

[GitHub] [kafka] mumrah commented on a diff in pull request #12965: KAFKA-14448 Let ZK brokers register with KRaft controller

2022-12-08 Thread GitBox


mumrah commented on code in PR #12965:
URL: https://github.com/apache/kafka/pull/12965#discussion_r1043473436


##
core/src/main/scala/kafka/server/KafkaServer.scala:
##
@@ -81,6 +84,19 @@ object KafkaServer {
 clientConfig
   }
 
+  def zkClient(name: String, time: Time, config: KafkaConfig, zkClientConfig: 
ZKClientConfig): KafkaZkClient = {

Review Comment:
   This is coming from #12946 which is getting merged first



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #12964: MINOR: Introduce MetadataProvenance and ImageReWriter

2022-12-08 Thread GitBox


mumrah commented on code in PR #12964:
URL: https://github.com/apache/kafka/pull/12964#discussion_r1043452505


##
metadata/src/main/java/org/apache/kafka/image/MetadataProvenance.java:
##
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+import java.util.Objects;
+
+
+/**
+ * Information about the source of a metadata image.
+ */
+public final class MetadataProvenance {
+public static final MetadataProvenance EMPTY = new MetadataProvenance(-1L, 
0, 0L);
+
+private final long offset;
+private final int epoch;
+private final long lastContainedLogTimeMs;
+
+public MetadataProvenance(
+long offset,
+int epoch,
+long lastContainedLogTimeMs
+) {
+this.offset = offset;
+this.epoch = epoch;
+this.lastContainedLogTimeMs = lastContainedLogTimeMs;
+}
+
+public OffsetAndEpoch offsetAndEpoch() {
+return new OffsetAndEpoch(offset, epoch);
+}
+
+public long offset() {
+return offset;
+}
+
+public int epoch() {
+return epoch;
+}
+
+public long lastContainedLogTimeMs() {
+return lastContainedLogTimeMs;
+}
+
+public String snapshotName() {

Review Comment:
   Is this used anywhere? Is it meant to match the name of the snapshot file?



##
metadata/src/main/java/org/apache/kafka/image/MetadataVersionChange.java:
##
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.server.common.MetadataVersion;
+
+import java.util.Objects;
+
+
+/**
+ * A change in the MetadataVersion.
+ */
+public final class MetadataVersionChange {

Review Comment:
   Would it be useful to have methods like "isUpgrade" or "isDowngrade" here?



##
metadata/src/main/java/org/apache/kafka/image/MetadataProvenance.java:
##
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+import java.util.Objects;
+
+
+/**
+ * Information about the source of a metadata image.
+ */
+public final class MetadataProvenance {
+public static final MetadataProvenance EMPTY = new MetadataProvenance(-1L, 
0, 0L);

Review Comment:
   We should probably make these sentinels the same as the initial values used 
in BrokerMetadataListener. That way, if someone read `provenance()` off the 
listener before we processed anything, it would equal EMPTY



##
metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java:
##
@@ -152,19 +153,7 @@ public 

[jira] [Updated] (KAFKA-14456) Fix AdminUtils startIndex for rack aware partition creations

2022-12-08 Thread Andrew Grant (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Grant updated KAFKA-14456:
-
Description: 
When new partitions are added/created we calculate a start index based off all 
the brokers here 
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/zk/AdminZkClient.scala#L270.]
 That start index is passed through to AdminUtils and is used to find a 
starting position in the list of brokers for making assignments. However, when 
we make rack aware assignments we use that index into a rack alternating list 
here 
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/AdminUtils.scala#L160.]
 The meaning of the index gets lost: the index into the full list of brokers 
doesnt seem to have the same meaning as the index into a rack alternating list. 

I discovered this when I published 
[https://github.com/apache/kafka/pull/12943/files.] In that PR I added a test 
testRackAwarePartitionAssignment which does not work for ZK mode.

  was:
When new partitions are added/created we calculate a start index based off all 
the brokers here 
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/zk/AdminZkClient.scala#L270.]
 That start index is passed through to AdminUtils and is used to find a 
starting position in the list of brokers for making assignments. However, when 
we make rack aware assignments we use that index into a rack alternating list 
here 
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/AdminUtils.scala#L160.]
 The meaning of the index gets lost: the index into the full list of brokers 
doesnt seem to have the same meaning as the index into a rack alternating list. 

 

I discovered this when I published 
[https://github.com/apache/kafka/pull/12943/files.] In that PR I added a test 
testRackAwarePartitionAssignment which does not work for ZK mode.


> Fix AdminUtils startIndex for rack aware partition creations 
> -
>
> Key: KAFKA-14456
> URL: https://issues.apache.org/jira/browse/KAFKA-14456
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Andrew Grant
>Priority: Major
>
> When new partitions are added/created we calculate a start index based off 
> all the brokers here 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/zk/AdminZkClient.scala#L270.]
>  That start index is passed through to AdminUtils and is used to find a 
> starting position in the list of brokers for making assignments. However, 
> when we make rack aware assignments we use that index into a rack alternating 
> list here 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/AdminUtils.scala#L160.]
>  The meaning of the index gets lost: the index into the full list of brokers 
> doesnt seem to have the same meaning as the index into a rack alternating 
> list. 
> I discovered this when I published 
> [https://github.com/apache/kafka/pull/12943/files.] In that PR I added a test 
> testRackAwarePartitionAssignment which does not work for ZK mode.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14456) Fix AdminUtils startIndex for rack aware partition creations

2022-12-08 Thread Andrew Grant (Jira)
Andrew Grant created KAFKA-14456:


 Summary: Fix AdminUtils startIndex for rack aware partition 
creations 
 Key: KAFKA-14456
 URL: https://issues.apache.org/jira/browse/KAFKA-14456
 Project: Kafka
  Issue Type: Improvement
Reporter: Andrew Grant


When new partitions are added/created we calculate a start index based off all 
the brokers here 
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/zk/AdminZkClient.scala#L270.]
 That start index is passed through to AdminUtils and is used to find a 
starting position in the list of brokers for making assignments. However, when 
we make rack aware assignments we use that index into a rack alternating list 
here 
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/AdminUtils.scala#L160.]
 The meaning of the index gets lost: the index into the full list of brokers 
doesnt seem to have the same meaning as the index into a rack alternating list. 

 

I discovered this when I published 
[https://github.com/apache/kafka/pull/12943/files.] In that PR I added a test 
testRackAwarePartitionAssignment which does not work for ZK mode.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] lucasbru commented on a diff in pull request #12935: KAFKA-14432: RocksDBStore relies on finalizers to not leak memory

2022-12-08 Thread GitBox


lucasbru commented on code in PR #12935:
URL: https://github.com/apache/kafka/pull/12935#discussion_r1043442711


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java:
##
@@ -526,6 +521,9 @@ public synchronized void close() {
 fOptions.close();
 filter.close();
 cache.close();
+if (statistics != null) {
+statistics.close();
+}
 

Review Comment:
   I think you documented it already in the javadoc for `close` 
https://github.com/apache/kafka/pull/6697



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] bbejeck commented on a diff in pull request #12962: KAFKA-14318: KIP-878, Introduce partition autoscaling configs

2022-12-08 Thread GitBox


bbejeck commented on code in PR #12962:
URL: https://github.com/apache/kafka/pull/12962#discussion_r1043422635


##
streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java:
##
@@ -1368,6 +1370,20 @@ public void 
shouldNotEnableAnyOptimizationsWithNoOptimizationConfig() {
 assertEquals(0, configs.size());
 }
 
+@Test
+public void shouldEnablePartitionAutoscaling() {
+props.put("partition.autoscaling.enabled", true);
+final StreamsConfig config = new StreamsConfig(props);
+assertTrue(config.getBoolean(PARTITION_AUTOSCALING_ENABLED_CONFIG));
+}
+
+@Test
+public void shouldSetPartitionAutoscalingTimeout() {
+props.put("partition.autoscaling.timeout.ms", 0L);
+final StreamsConfig config = new StreamsConfig(props);
+assertThat(config.getBoolean(PARTITION_AUTOSCALING_TIMEOUT_MS_CONFIG), 
is(0L));

Review Comment:
   oops! `config.getLong` here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] justinrlee commented on pull request #12797: MINOR: Remove requirement to specify --bootstrap-server in utility scripts if specified in property file

2022-12-08 Thread GitBox


justinrlee commented on PR #12797:
URL: https://github.com/apache/kafka/pull/12797#issuecomment-1342751002

   Okay, I can remove the argument section. If I do that, you think we can get 
away without a KIP?
   
   What are your thoughts on my error state and the the way the CLI reacts 
without bootstrap.server?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #12935: KAFKA-14432: RocksDBStore relies on finalizers to not leak memory

2022-12-08 Thread GitBox


cadonna commented on PR #12935:
URL: https://github.com/apache/kafka/pull/12935#issuecomment-1342749417

   I agree! I will backport this commit.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-12635) Mirrormaker 2 offset sync is incorrect if the target partition is empty

2022-12-08 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17644810#comment-17644810
 ] 

Mickael Maison commented on KAFKA-12635:


The fix only ensured new offsets will be correct. If you have existing negative 
offsets, you either have to wait till a new offset is emitted or you can 
manually update them using the Admin client or the kafka-consumer-groups tool 
for example.

> Mirrormaker 2 offset sync is incorrect if the target partition is empty
> ---
>
> Key: KAFKA-12635
> URL: https://issues.apache.org/jira/browse/KAFKA-12635
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.7.0
>Reporter: Frank Yi
>Assignee: Mickael Maison
>Priority: Major
> Fix For: 3.3.0
>
> Attachments: image-2022-11-02-11-53-33-329.png, 
> image-2022-11-02-11-56-34-994.png, screenshot-1.png
>
>
> This bug occurs when using Mirrormaker with "sync.group.offsets.enabled = 
> true".
> If a source partition is empty, but the source consumer group's offset for 
> that partition is non-zero, then Mirrormaker sets the target consumer group's 
> offset for that partition to the literal, not translated, offset of the 
> source consumer group. This state can be reached if the source consumer group 
> consumed some records that were now deleted (like by a retention policy), or 
> if Mirrormaker replication is set to start at "latest". This bug causes the 
> target consumer group's lag for that partition to be negative and breaks 
> offset sync for that partition until lag is positive.
> The correct behavior when the source partition is empty would be to set the 
> target offset to the translated offset, not literal offset, which in this 
> case would always be 0. 
> Original email thread on this issue: 
> https://lists.apache.org/thread.html/r7c54ee5f57227367b911d4abffa72781772d8dd3b72d75eb65ee19f7%40%3Cusers.kafka.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14455) Kafka Connect create and update REST APIs should surface failures while writing to the config topic

2022-12-08 Thread Yash Mayya (Jira)
Yash Mayya created KAFKA-14455:
--

 Summary: Kafka Connect create and update REST APIs should surface 
failures while writing to the config topic
 Key: KAFKA-14455
 URL: https://issues.apache.org/jira/browse/KAFKA-14455
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Yash Mayya
Assignee: Yash Mayya


Kafka Connect's `POST /connectors` and `PUT /connectors/\{connector}/config` 
REST APIs internally simply write a message to the Connect cluster's internal 
config topic (which is then processed asynchronously by the herder). However, 
no callback is passed to the producer's send method and there is no error 
handling in place for producer send failures (see 
[here|https://github.com/apache/kafka/blob/c1a54671e8fc6c7daec5f5ec3d8c934be96b4989/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L716]
 / 
[here|https://github.com/apache/kafka/blob/c1a54671e8fc6c7daec5f5ec3d8c934be96b4989/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L726]).

Consider one such case where the Connect worker's principal doesn't have a 
WRITE ACL on the cluster's config topic. Now suppose the user submits a 
connector's configs via one of the above two APIs. The producer send 
[here|https://github.com/apache/kafka/blob/c1a54671e8fc6c7daec5f5ec3d8c934be96b4989/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L716]
 / 
[here|https://github.com/apache/kafka/blob/c1a54671e8fc6c7daec5f5ec3d8c934be96b4989/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L726]
 won't succeed (due to a TopicAuthorizationException) but the API responses 
will be `201 Created` success responses anyway. This is a very poor UX because 
the connector will actually never be created but the API response indicated 
success. Furthermore, this failure would only be detectable if TRACE logs are 
enabled (via [this 
log)|https://github.com/apache/kafka/blob/df29b17fc40f7c15460988d58bc652c3d66b60f8/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java]
 making it near impossible for users to debug. Producer callbacks should be 
used to surface write failures back to the user via the API response.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] satishd commented on a diff in pull request #11390: [KAFKA-13369] Follower fetch protocol changes for tiered storage.

2022-12-08 Thread GitBox


satishd commented on code in PR #11390:
URL: https://github.com/apache/kafka/pull/11390#discussion_r1043330520


##
core/src/main/scala/kafka/log/remote/RemoteLogManager.scala:
##
@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote
+
+import kafka.cluster.Partition
+import kafka.metrics.KafkaMetricsGroup
+import kafka.server.KafkaConfig
+import kafka.server.epoch.LeaderEpochFileCache
+import kafka.utils.Logging
+import org.apache.kafka.common._
+import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
+import org.apache.kafka.common.record.{RecordBatch, RemoteLogInputStream}
+import org.apache.kafka.common.utils.{ChildFirstClassLoader, Utils}
+import 
org.apache.kafka.server.log.remote.metadata.storage.ClassLoaderAwareRemoteLogMetadataManager
+import org.apache.kafka.server.log.remote.storage.{RemoteLogManagerConfig, 
RemoteLogMetadataManager, RemoteLogSegmentMetadata, RemoteStorageManager}
+
+import java.io.{Closeable, InputStream}
+import java.security.{AccessController, PrivilegedAction}
+import java.util
+import java.util.Optional
+import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
+import scala.collection.Set
+import scala.jdk.CollectionConverters._
+
+/**
+ * This class is responsible for
+ *  - initializing `RemoteStorageManager` and `RemoteLogMetadataManager` 
instances.
+ *  - receives any leader and follower replica events and partition stop 
events and act on them
+ *  - also provides APIs to fetch indexes, metadata about remote log segments.
+ *
+ * @param rlmConfig Configuration required for remote logging subsystem(tiered 
storage) at the broker level.
+ * @param brokerId  id of the current broker.
+ * @param logDirdirectory of Kafka log segments.
+ */
+class RemoteLogManager(rlmConfig: RemoteLogManagerConfig,
+   brokerId: Int,
+   logDir: String) extends Logging with Closeable with 
KafkaMetricsGroup {
+
+  // topic ids received on leadership changes
+  private val topicPartitionIds: ConcurrentMap[TopicPartition, Uuid] = new 
ConcurrentHashMap[TopicPartition, Uuid]()
+
+  private val remoteLogStorageManager: RemoteStorageManager = 
createRemoteStorageManager()
+  private val remoteLogMetadataManager: RemoteLogMetadataManager = 
createRemoteLogMetadataManager()
+
+  private val indexCache = new RemoteIndexCache(remoteStorageManager = 
remoteLogStorageManager, logDir = logDir)
+
+  private var closed = false
+
+  private[remote] def createRemoteStorageManager(): RemoteStorageManager = {
+def createDelegate(classLoader: ClassLoader): RemoteStorageManager = {
+  classLoader.loadClass(rlmConfig.remoteStorageManagerClassName())
+
.getDeclaredConstructor().newInstance().asInstanceOf[RemoteStorageManager]
+}
+
+AccessController.doPrivileged(new PrivilegedAction[RemoteStorageManager] {
+  private val classPath = rlmConfig.remoteStorageManagerClassPath()
+
+  override def run(): RemoteStorageManager = {
+  if (classPath != null && classPath.trim.nonEmpty) {
+val classLoader = new ChildFirstClassLoader(classPath, 
this.getClass.getClassLoader)
+val delegate = createDelegate(classLoader)
+new ClassLoaderAwareRemoteStorageManager(delegate, classLoader)
+  } else {
+createDelegate(this.getClass.getClassLoader)
+  }
+  }
+})
+  }
+
+  private def configureRSM(): Unit = {
+val rsmProps = new util.HashMap[String, Any]()
+rlmConfig.remoteStorageManagerProps().asScala.foreach { case (k, v) => 
rsmProps.put(k, v) }
+rsmProps.put(KafkaConfig.BrokerIdProp, brokerId)
+remoteLogStorageManager.configure(rsmProps)
+  }
+
+  private[remote] def createRemoteLogMetadataManager(): 
RemoteLogMetadataManager = {
+def createDelegate(classLoader: ClassLoader) = {
+  classLoader.loadClass(rlmConfig.remoteLogMetadataManagerClassName())
+.getDeclaredConstructor()
+.newInstance()
+.asInstanceOf[RemoteLogMetadataManager]
+}
+
+AccessController.doPrivileged(new 
PrivilegedAction[RemoteLogMetadataManager] {
+  private val classPath = rlmConfig.remoteLogMetadataManagerClassPath
+
+  

[GitHub] [kafka] lucasbru commented on pull request #12935: KAFKA-14432: RocksDBStore relies on finalizers to not leak memory

2022-12-08 Thread GitBox


lucasbru commented on PR #12935:
URL: https://github.com/apache/kafka/pull/12935#issuecomment-1342589776

   > If there are any other/older branches you think this should go to, I'll 
leave it to @cadonna to port the fix as I suspect there could be nontrivial 
merge conflicts
   
   Yes, hmm we probably have to port this back to all branches that got the 
`RocksDB` upgrade, right?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #12935: KAFKA-14432: RocksDBStore relies on finalizers to not leak memory

2022-12-08 Thread GitBox


cadonna commented on code in PR #12935:
URL: https://github.com/apache/kafka/pull/12935#discussion_r1043248049


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java:
##
@@ -229,40 +230,34 @@ void openDB(final Map configs, final File 
stateDir) {
 
 // Setup statistics before the database is opened, otherwise the 
statistics are not updated
 // with the measurements from Rocks DB
-maybeSetUpStatistics(configs);
-
+statistics = userSpecifiedOptions.statistics();
+if (statistics == null) {
+if (RecordingLevel.forName((String) 
configs.get(METRICS_RECORDING_LEVEL_CONFIG)) == RecordingLevel.DEBUG) {
+statistics = new Statistics();
+dbOptions.setStatistics(statistics);
+}
+userSpecifiedStatistics = false;
+} else {
+userSpecifiedStatistics = true;
+}

Review Comment:
   No comment!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon closed pull request #12631: [WIP] MINOR: building

2022-12-08 Thread GitBox


showuon closed pull request #12631: [WIP] MINOR: building
URL: https://github.com/apache/kafka/pull/12631


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac closed pull request #12897: KAFKA-14379: consumer should refresh preferred read replica on update metadata

2022-12-08 Thread GitBox


dajac closed pull request #12897: KAFKA-14379: consumer should refresh 
preferred read replica on update metadata
URL: https://github.com/apache/kafka/pull/12897


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org