[GitHub] [kafka] vamossagar12 commented on pull request #12971: KAFKA-14454: Making unique StreamsConfig for tests
vamossagar12 commented on PR #12971: URL: https://github.com/apache/kafka/pull/12971#issuecomment-1343960862 Looks like Commit failed after one of the suggestions. Would push a fix for that in sometime. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a diff in pull request #12971: KAFKA-14454: Making unique StreamsConfig for tests
ableegoldman commented on code in PR #12971: URL: https://github.com/apache/kafka/pull/12971#discussion_r1044153926 ## streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java: ## @@ -196,19 +202,16 @@ public void shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultipl for (final KafkaStreams stream: kafkaStreamsList) { stream.setUncaughtExceptionHandler(e -> { -assertEquals("The partitions returned by StreamPartitioner#partitions method when used for FK join should be a singleton set", e.getCause().getMessage()); +Assertions.assertEquals("The partitions returned by StreamPartitioner#partitions method when used for FK join should be a singleton set", e.getCause().getMessage()); return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; }); } startApplicationAndWaitUntilRunning(kafkaStreamsList, ofSeconds(120)); -// Sleeping to let the processing happen inducing the failure -Thread.sleep(6); +// the streams applications should in failed state due to the IllegalStateException. +waitForApplicationState(Arrays.asList(streams, streamsTwo, streamsThree), KafkaStreams.State.ERROR, ofSeconds(30)); Review Comment: nit: no idea if we actually used the full 60s that we used to sleep, but let's continue to give it 60s ## streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java: ## @@ -196,19 +202,16 @@ public void shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultipl for (final KafkaStreams stream: kafkaStreamsList) { stream.setUncaughtExceptionHandler(e -> { -assertEquals("The partitions returned by StreamPartitioner#partitions method when used for FK join should be a singleton set", e.getCause().getMessage()); +Assertions.assertEquals("The partitions returned by StreamPartitioner#partitions method when used for FK join should be a singleton set", e.getCause().getMessage()); return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; }); } startApplicationAndWaitUntilRunning(kafkaStreamsList, ofSeconds(120)); -// Sleeping to let the processing happen inducing the failure -Thread.sleep(6); +// the streams applications should in failed state due to the IllegalStateException. Review Comment: ```suggestion // the streams applications should have shut down into `ERROR` due to the IllegalStateException ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #12951: MINOR: Move MetadataQuorumCommand from `core` to `tools`
ijuma commented on code in PR #12951: URL: https://github.com/apache/kafka/pull/12951#discussion_r1044153255 ## bin/kafka-metadata-quorum.sh: ## @@ -14,4 +14,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -exec $(dirname $0)/kafka-run-class.sh kafka.admin.MetadataQuorumCommand "$@" Review Comment: Good catch! Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a diff in pull request #12971: KAFKA-14454: Making unique StreamsConfig for tests
ableegoldman commented on code in PR #12971: URL: https://github.com/apache/kafka/pull/12971#discussion_r1044152824 ## streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java: ## @@ -196,19 +202,16 @@ public void shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultipl for (final KafkaStreams stream: kafkaStreamsList) { stream.setUncaughtExceptionHandler(e -> { -assertEquals("The partitions returned by StreamPartitioner#partitions method when used for FK join should be a singleton set", e.getCause().getMessage()); +Assertions.assertEquals("The partitions returned by StreamPartitioner#partitions method when used for FK join should be a singleton set", e.getCause().getMessage()); Review Comment: We've been moving to `assertThat` instead of `assertEquals` and co: ```suggestion assertThat(e.getCause().getMessage(), equalTo("The partitions returned by StreamPartitioner#partitions method when used for FK join should be a singleton set")); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #12937: KAFKA-13881: Add Connect package infos
ijuma commented on PR #12937: URL: https://github.com/apache/kafka/pull/12937#issuecomment-1343936394 Thanks for improving the javadocs! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 opened a new pull request, #12971: KAFKA-14454: Making unique StreamsConfig for tests
vamossagar12 opened a new pull request, #12971: URL: https://github.com/apache/kafka/pull/12971 Newly added test KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions as part of KIP-837 passes when run individually but fails when is part of IT class and hence is marked as Ignored. That seemed to have been because of the way StreamsConfig was being initialised so any new test would have used the same names. Because of which the second test never got to the desired state. With this PR, every test gets a unique app name which seems to have fixed the issue. Also, a couple of cosmetic changes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14454) KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions passes when run individua
[ https://issues.apache.org/jira/browse/KAFKA-14454?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao updated KAFKA-14454: -- Description: Newly added test KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions as part of KIP-837 passes when run individually but fails when is part of IT class and hence is marked as Ignored. was: Newly added test KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions as part of KIP-837 passes when run individually but fails when is part of IT class and hence is marked as Ignored. As part of this ticket, we can also look to move to Junit5 annotations for this class since it relies on Junit4 ones. > KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions > passes when run individually but not when is run as part of the IT > -- > > Key: KAFKA-14454 > URL: https://issues.apache.org/jira/browse/KAFKA-14454 > Project: Kafka > Issue Type: Bug >Reporter: Sagar Rao >Assignee: Sagar Rao >Priority: Major > > Newly added test > KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions > as part of KIP-837 passes when run individually but fails when is part of IT > class and hence is marked as Ignored. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] aglicacha commented on pull request #12729: KAFKA-14285: Delete quota node in zookeeper when configs are empty
aglicacha commented on PR #12729: URL: https://github.com/apache/kafka/pull/12729#issuecomment-1343708253 > @aglicacha , there is compile error, could you take a look? The error was caused by the unnecessary default value of param isUserClientId in method tryCleanQuotaNodes. I have fixed it. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12968: KAFKA-14417: Addressing the error server side
jolshan commented on code in PR #12968: URL: https://github.com/apache/kafka/pull/12968#discussion_r1043958866 ## core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala: ## @@ -167,7 +167,7 @@ class RPCProducerIdManager(brokerId: Int, if (nextProducerId > currentProducerIdBlock.lastProducerId) { val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS) if (block == null) { - throw Errors.REQUEST_TIMED_OUT.exception("Timed out waiting for next producer ID block") Review Comment: I'd assume its caught in the same place as the other error since they both end up getting thrown here. I wouldn't expect to see the request timed out get hit if the errors are caught. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] akhileshchg commented on a diff in pull request #12961: KAFKA-14446: API forwarding support from zkBrokers to the Controller
akhileshchg commented on code in PR #12961: URL: https://github.com/apache/kafka/pull/12961#discussion_r1042776473 ## clients/src/main/resources/common/message/EnvelopeRequest.json: ## @@ -16,10 +16,11 @@ { "apiKey": 58, "type": "request", - "listeners": ["controller"], + "listeners": ["controller", "zkBroker"], "name": "EnvelopeRequest", // Request struct for forwarding. - "validVersions": "0", + // Version 1 adds the envelope request support to ZK brokers as well. + "validVersions": "0-1", Review Comment: That makes sense. Let me revert the version to 0. ## core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala: ## @@ -37,42 +38,55 @@ import scala.collection.Seq import scala.compat.java8.OptionConverters._ import scala.jdk.CollectionConverters._ +case class ControllerInformation(node: Option[Node], + listenerName: ListenerName, + securityProtocol: SecurityProtocol, + saslMechanism: String, + isZkController: Boolean) + trait ControllerNodeProvider { def get(): Option[Node] - def listenerName: ListenerName - def securityProtocol: SecurityProtocol - def saslMechanism: String -} - -object MetadataCacheControllerNodeProvider { - def apply( -config: KafkaConfig, -metadataCache: kafka.server.MetadataCache - ): MetadataCacheControllerNodeProvider = { -val listenerName = config.controlPlaneListenerName - .getOrElse(config.interBrokerListenerName) - -val securityProtocol = config.controlPlaneSecurityProtocol - .getOrElse(config.interBrokerSecurityProtocol) - -new MetadataCacheControllerNodeProvider( - metadataCache, - listenerName, - securityProtocol, - config.saslMechanismInterBrokerProtocol -) - } + def getControllerInfo(): ControllerInformation } class MetadataCacheControllerNodeProvider( - val metadataCache: kafka.server.MetadataCache, - val listenerName: ListenerName, - val securityProtocol: SecurityProtocol, - val saslMechanism: String + val metadataCache: ZkMetadataCache, + val config: KafkaConfig ) extends ControllerNodeProvider { + + def listenerName(isZkController: Boolean): ListenerName = { Review Comment: Whether this is part of the metadata cache or node provider, we still need to check for listername, security protocol and SASLMechanism needed to communicate with the controller based on if we're trying to connect to zkController or kraftController. I am not sure I completely understood your comment. ## core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala: ## @@ -164,25 +184,63 @@ class BrokerToControllerChannelManagerImpl( private val logContext = new LogContext(s"[BrokerToControllerChannelManager broker=${config.brokerId} name=$channelName] ") private val manualMetadataUpdater = new ManualMetadataUpdater() private val apiVersions = new ApiVersions() - private val requestThread = newRequestThread + @volatile private var isZkControllerThread = true + @volatile private var requestThread = newRequestThread() + @volatile private var shutdownStarted = false def start(): Unit = { requestThread.start() +maybeScheduleReinitializeRequestThread() } def shutdown(): Unit = { requestThread.shutdown() +shutdownStarted = false info(s"Broker to controller channel manager for $channelName shutdown") } - private[server] def newRequestThread = { + def maybeScheduleReinitializeRequestThread(): Unit = { +// If migration is enabled for zkBroker, then we might see controller change from zk to kraft +// and vice-versa. This periodic task takes care of setting the right channel when such +// controller change is noticed. +if (config.migrationEnabled && config.requiresZookeeper) { Review Comment: I tried to swap out just the networkclient in interBrokerSendThread. Let me know how it looks now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12968: KAFKA-14417: Addressing the error server side
hachikuji commented on code in PR #12968: URL: https://github.com/apache/kafka/pull/12968#discussion_r1043942073 ## core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala: ## @@ -167,7 +167,7 @@ class RPCProducerIdManager(brokerId: Int, if (nextProducerId > currentProducerIdBlock.lastProducerId) { val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS) if (block == null) { - throw Errors.REQUEST_TIMED_OUT.exception("Timed out waiting for next producer ID block") Review Comment: Where does this exception get caught? I wonder if it reaches the "unexpected error" handling in `KafkaApis`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14340) KIP-880: X509 SAN based SPIFFE URI ACL within mTLS Client Certificates
[ https://issues.apache.org/jira/browse/KAFKA-14340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bart Van Bos updated KAFKA-14340: - Description: Istio and other *SPIFFE* based systems use X509 Client Certificates to provide workload ID. Kafka currently does support Client Cert based AuthN/Z and mapping to ACL, but only so be inspecting the CN field within a Client Certificate. * [https://spiffe.io/docs/latest/spiffe-about/spiffe-concepts/#spiffe-id] There are several POC implementations out there implementing a bespoke _KafkaPrincipalBuilder_ implementation for this purpose. Two examples include * [https://github.com/traiana/kafka-spiffe-principal] * [https://github.com/boeboe/kafka-istio-principal-builder] (written by myself) The gist is to introspect X509 based client certificates, look for a URI based SPIFFE entry in the SAN extension and return that as a principle, that can be used to write ACL rules. This KIP request is to include this functionality into Kafka's main functionality so end-users don't need to load custom and non-vetted java classes/implementations. The main use case for me is having a lot of Istio customers that express the will to be able to leverage SPIFFE based IDs for their Kafka ACL Authorization. This eliminates the need for sidecars on the broker side or custom _EnvoyFilters_ and other less optimal implementations to integrate Kafka into an Istio secured Kubernetes environment. I believe this would make for a better integration between the Istio/SPIFFE and Kafka ecosystems. was: Istio and other *SPIFFE* based systems use X509 Client Certificates to provide workload ID. Kafka currently does support Client Cert based AuthN/Z and mapping to ACL, but only so be inspecting the CN field within a Client Certificate. There are several POC implementations out there implementing a bespoke _KafkaPrincipalBuilder_ implementation for this purpose. Two examples include * [https://github.com/traiana/kafka-spiffe-principal] * [https://github.com/boeboe/kafka-istio-principal-builder] (written by myself) The gist is to introspect X509 based client certificates, look for a URI based SPIFFE entry in the SAN extension and return that as a principle, that can be used to write ACL rules. This KIP request is to include this functionality into Kafka's main functionality so end-users don't need to load custom and non-vetted java classes/implementations. The main use case for me is having a lot of Istio customers that express the will to be able to leverage SPIFFE based IDs for their Kafka ACL Authorization. This eliminates the need for sidecars on the broker side or custom _EnvoyFilters_ and other less optimal implementations to integrate Kafka into an Istio secured Kubernetes environment. I believe this would make for a better integration between the Istio/SPIFFE and Kafka ecosystems. > KIP-880: X509 SAN based SPIFFE URI ACL within mTLS Client Certificates > --- > > Key: KAFKA-14340 > URL: https://issues.apache.org/jira/browse/KAFKA-14340 > Project: Kafka > Issue Type: Wish > Components: security >Affects Versions: 3.3.1 >Reporter: Bart Van Bos >Assignee: Bart Van Bos >Priority: Minor > > Istio and other *SPIFFE* based systems use X509 Client Certificates to > provide workload ID. Kafka currently does support Client Cert based AuthN/Z > and mapping to ACL, but only so be inspecting the CN field within a Client > Certificate. > * [https://spiffe.io/docs/latest/spiffe-about/spiffe-concepts/#spiffe-id] > There are several POC implementations out there implementing a bespoke > _KafkaPrincipalBuilder_ implementation for this purpose. Two examples include > * [https://github.com/traiana/kafka-spiffe-principal] > * [https://github.com/boeboe/kafka-istio-principal-builder] (written by > myself) > The gist is to introspect X509 based client certificates, look for a URI > based SPIFFE entry in the SAN extension and return that as a principle, that > can be used to write ACL rules. > This KIP request is to include this functionality into Kafka's main > functionality so end-users don't need to load custom and non-vetted java > classes/implementations. > The main use case for me is having a lot of Istio customers that express the > will to be able to leverage SPIFFE based IDs for their Kafka ACL > Authorization. This eliminates the need for sidecars on the broker side or > custom _EnvoyFilters_ and other less optimal implementations to integrate > Kafka into an Istio secured Kubernetes environment. > I believe this would make for a better integration between the Istio/SPIFFE > and Kafka ecosystems. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14340) KIP-880: X509 SAN based SPIFFE URI ACL within mTLS Client Certificates
[ https://issues.apache.org/jira/browse/KAFKA-14340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bart Van Bos reassigned KAFKA-14340: Assignee: Bart Van Bos > KIP-880: X509 SAN based SPIFFE URI ACL within mTLS Client Certificates > --- > > Key: KAFKA-14340 > URL: https://issues.apache.org/jira/browse/KAFKA-14340 > Project: Kafka > Issue Type: Wish > Components: security >Affects Versions: 3.3.1 >Reporter: Bart Van Bos >Assignee: Bart Van Bos >Priority: Minor > > Istio and other *SPIFFE* based systems use X509 Client Certificates to > provide workload ID. Kafka currently does support Client Cert based AuthN/Z > and mapping to ACL, but only so be inspecting the CN field within a Client > Certificate. > There are several POC implementations out there implementing a bespoke > _KafkaPrincipalBuilder_ implementation for this purpose. Two examples include > * [https://github.com/traiana/kafka-spiffe-principal] > * [https://github.com/boeboe/kafka-istio-principal-builder] (written by > myself) > The gist is to introspect X509 based client certificates, look for a URI > based SPIFFE entry in the SAN extension and return that as a principle, that > can be used to write ACL rules. > This KIP request is to include this functionality into Kafka's main > functionality so end-users don't need to load custom and non-vetted java > classes/implementations. > The main use case for me is having a lot of Istio customers that express the > will to be able to leverage SPIFFE based IDs for their Kafka ACL > Authorization. This eliminates the need for sidecars on the broker side or > custom _EnvoyFilters_ and other less optimal implementations to integrate > Kafka into an Istio secured Kubernetes environment. > I believe this would make for a better integration between the Istio/SPIFFE > and Kafka ecosystems. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] philipnee commented on a diff in pull request #12862: Consumer refator find coordinator
philipnee commented on code in PR #12862: URL: https://github.com/apache/kafka/pull/12862#discussion_r1043939575 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java: ## @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.message.FindCoordinatorRequestData; +import org.apache.kafka.common.message.FindCoordinatorResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.FindCoordinatorRequest; +import org.apache.kafka.common.requests.FindCoordinatorResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Objects; +import java.util.Optional; + +/** + * Handles the timing of the next FindCoordinatorRequest based on the {@link RequestState}. It checks for: + * 1. If there's an existing coordinator. + * 2. If there is an inflight request + * 3. If the backoff timer has expired + * + * The {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} contains either a wait + * timer, or a singleton list of + * {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}. + * + * The FindCoordinatorResponse will be handled by the {@link FindCoordinatorRequestHandler} callback, which + * subsequently invokes {@code onResponse} to handle the exceptions and responses. Note that, the coordinator node + * will be marked {@code null} upon receiving a failure. + */ +public class CoordinatorRequestManager implements RequestManager { + +private final Logger log; +private final Time time; +private final ErrorEventHandler errorHandler; +private final long rebalanceTimeoutMs; +private final String groupId; + +private final RequestState coordinatorRequestState; +private long timeMarkedUnknownMs = -1L; // starting logging a warning only after unable to connect for a while +private Node coordinator; + + +public CoordinatorRequestManager(final Time time, + final LogContext logContext, + final ConsumerConfig config, + final ErrorEventHandler errorHandler, + final String groupId, + final long rebalanceTimeoutMs) { +Objects.requireNonNull(groupId); +this.time = time; +this.log = logContext.logger(this.getClass()); +this.errorHandler = errorHandler; +this.groupId = groupId; +this.rebalanceTimeoutMs = rebalanceTimeoutMs; +this.coordinatorRequestState = new RequestState(config); +} + +// Visible for testing +CoordinatorRequestManager(final Time time, + final LogContext logContext, + final ErrorEventHandler errorHandler, + final String groupId, + final long rebalanceTimeoutMs, + final RequestState coordinatorRequestState) { +Objects.requireNonNull(groupId); +this.time = time; +this.log = logContext.logger(this.getClass()); +this.errorHandler = errorHandler; +this.groupId = groupId; +this.rebalanceTimeoutMs = rebalanceTimeoutMs; +this.coordinatorRequestState = coordinatorRequestState; +} + +@Override +public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { +if (this.coordinator != null) { +return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, Collections.emptyList()); +} + +if (coordinatorRequestState.canSendRequest(currentTimeMs)) { +
[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator
hachikuji commented on code in PR #12862: URL: https://github.com/apache/kafka/pull/12862#discussion_r1043936750 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java: ## @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.message.FindCoordinatorRequestData; +import org.apache.kafka.common.message.FindCoordinatorResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.FindCoordinatorRequest; +import org.apache.kafka.common.requests.FindCoordinatorResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Objects; +import java.util.Optional; + +/** + * Handles the timing of the next FindCoordinatorRequest based on the {@link RequestState}. It checks for: + * 1. If there's an existing coordinator. + * 2. If there is an inflight request + * 3. If the backoff timer has expired + * + * The {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} contains either a wait + * timer, or a singleton list of + * {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}. + * + * The FindCoordinatorResponse will be handled by the {@link FindCoordinatorRequestHandler} callback, which + * subsequently invokes {@code onResponse} to handle the exceptions and responses. Note that, the coordinator node + * will be marked {@code null} upon receiving a failure. + */ +public class CoordinatorRequestManager implements RequestManager { + +private final Logger log; +private final Time time; Review Comment: Seems like we can get rid of this depenence? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java: ## @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.message.FindCoordinatorRequestData; +import org.apache.kafka.common.message.FindCoordinatorResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.FindCoordinatorRequest; +import org.apache.kafka.common.requests.FindCoordinatorResponse; +import org.apache.kafka.common.utils.ExponentialBackoff; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +public class CoordinatorRequestManager implements RequestManager { + +private final Logger log; +private final Time time; +private final long requestTimeoutMs; +
[GitHub] [kafka] cmccabe commented on a diff in pull request #12965: KAFKA-14448 Let ZK brokers register with KRaft controller
cmccabe commented on code in PR #12965: URL: https://github.com/apache/kafka/pull/12965#discussion_r1043911867 ## clients/src/main/resources/common/message/BrokerRegistrationRequest.json: ## @@ -51,7 +51,7 @@ }, { "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+", "about": "The rack which this broker is in." }, -{ "name": "IsMigratingZkBroker", "type": "int8", "versions": "0+", "taggedVersions": "0+", "tag": 0, "ignorable": true, +{ "name": "IsMigratingZkBroker", "type": "bool", "versions": "0+", "taggedVersions": "0+", "tag": 0, "ignorable": true, Review Comment: see my comment in the main PR conversation thread: this should not be tagged and we need a new version of the rpc ... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #12965: KAFKA-14448 Let ZK brokers register with KRaft controller
cmccabe commented on code in PR #12965: URL: https://github.com/apache/kafka/pull/12965#discussion_r1043911573 ## core/src/main/scala/kafka/server/KafkaServer.scala: ## @@ -690,6 +772,14 @@ class KafkaServer( _brokerState = BrokerState.PENDING_CONTROLLED_SHUTDOWN + if (config.migrationEnabled && lifecycleManager != null) { +// TODO KAFKA-14447 Only use KRaft controlled shutdown (when in migration mode) Review Comment: so the TODO here is that we need to check if the controller is a kraft one? fair enough. I keep thinking that info should be in the metadata cache ... I guess we can do that in a follow-up. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #12965: KAFKA-14448 Let ZK brokers register with KRaft controller
cmccabe commented on code in PR #12965: URL: https://github.com/apache/kafka/pull/12965#discussion_r1043911086 ## core/src/main/scala/kafka/server/KafkaServer.scala: ## @@ -453,6 +534,16 @@ class KafkaServer( dynamicConfigManager = new ZkConfigManager(zkClient, dynamicConfigHandlers) dynamicConfigManager.startup() +if (config.migrationEnabled && lifecycleManager != null) { + lifecycleManager.initialCatchUpFuture.whenComplete { case (_, t) => +if (t != null) { + fatal("Encountered an exception when waiting to catch up with KRaft metadata log", t) + shutdown() +} else { + lifecycleManager.setReadyToUnfence() Review Comment: can we have a log message here (INFO) ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #12965: KAFKA-14448 Let ZK brokers register with KRaft controller
cmccabe commented on code in PR #12965: URL: https://github.com/apache/kafka/pull/12965#discussion_r1043910049 ## core/src/main/scala/kafka/server/KafkaServer.scala: ## @@ -81,6 +84,19 @@ object KafkaServer { clientConfig } + def zkClient(name: String, time: Time, config: KafkaConfig, zkClientConfig: ZKClientConfig): KafkaZkClient = { Review Comment: Hmm. Maybe I'm missing something, but I don't see any relationship with `KafkaServer`. Why put this in `KafkaServer.scala`? Can this be a `static` function in `KafkaZkClient.scala`? (aka "function on `object KafkaZkClient`")? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #12965: KAFKA-14448 Let ZK brokers register with KRaft controller
cmccabe commented on code in PR #12965: URL: https://github.com/apache/kafka/pull/12965#discussion_r1043908893 ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -2230,8 +2230,15 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami validateAdvertisedListenersNonEmptyForBroker() } else { // ZK-based - // controller listener names must be empty when not in KRaft mode - require(controllerListenerNames.isEmpty, s"${KafkaConfig.ControllerListenerNamesProp} must be empty when not running in KRaft mode: ${controllerListenerNames.asJava}") + if (migrationEnabled) { +validateNonEmptyQuorumVotersForKRaft() +require(controllerListenerNames.nonEmpty, Review Comment: this seems like a bug. in migration mode you want it to be non-empty, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #12965: KAFKA-14448 Let ZK brokers register with KRaft controller
cmccabe commented on code in PR #12965: URL: https://github.com/apache/kafka/pull/12965#discussion_r1043907559 ## core/src/main/scala/kafka/server/BrokerLifecycleManager.scala: ## @@ -53,7 +53,8 @@ import scala.jdk.CollectionConverters._ */ class BrokerLifecycleManager(val config: KafkaConfig, Review Comment: can you use the standard style of ``` foo( bar baz quux ) { } ``` this code was written before we adopted that but if we're adding new parameters let's format it that way to avoid having so much indentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #12965: KAFKA-14448 Let ZK brokers register with KRaft controller
cmccabe commented on PR #12965: URL: https://github.com/apache/kafka/pull/12965#issuecomment-1343489745 Thanks for the PR! > This patch uses the new isMigratingZkBroker field in BrokerRegistrationRequest and RegisterBrokerRecord. The type was changed from int8 to bool for BrokerRegistrationRequest (a mistake from https://github.com/apache/kafka/pull/12860). I think there is one other major mistake we made there: this should not be a tagged field, but should be a new version of the RPC. The reason is because this is not safe to ignore. If an old controller treats this broker like an ordinary KRaft broker, that would be a big mistake. While a new verison of `BrokerRegistrationRequest` may seem difficult to implement, I think it's actually simpler than it seems. The `NetworkClient` itself should have the `ApiVersions` of the controller available. It gets this when it first connects to the controller. So we can simply take the max of the registration version our broker supports, and the registration version the controller supports. There is already code to do all this in `BrokerRegistrationRequest#Builder`. The only complication is if you need to set `isZkBroker` you need to set `BrokerRegistrationRequest#Builder.oldestAllowedVersion` to 1 rather than 0. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] artemlivshits commented on a diff in pull request #12956: KAFKA-14379: consumer should refresh preferred read replica on update metadata
artemlivshits commented on code in PR #12956: URL: https://github.com/apache/kafka/pull/12956#discussion_r1043896712 ## clients/src/test/java/org/apache/kafka/common/requests/RequestTestUtils.java: ## @@ -208,10 +209,10 @@ public static MetadataResponse metadataUpdateWith(final String clusterId, for (int i = 0; i < numPartitions; i++) { TopicPartition tp = new TopicPartition(topic, i); Node leader = nodes.get(i % nodes.size()); -List replicaIds = Collections.singletonList(leader.id()); +List replicaIds = nodes.stream().map(Node::id).collect(Collectors.toList()); partitionMetadata.add(partitionSupplier.supply( Errors.NONE, tp, Optional.of(leader.id()), Optional.ofNullable(epochSupplier.apply(tp)), -replicaIds, replicaIds, replicaIds)); +replicaIds, replicaIds, Collections.emptyList())); Review Comment: Not sure about this change, looks like a re-base mistake. Will take a closer look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #12967: MINOR: Replace ArrayBuffer with ListBuffer for better performance
ijuma commented on code in PR #12967: URL: https://github.com/apache/kafka/pull/12967#discussion_r1043889697 ## core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala: ## @@ -484,10 +484,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState * group does not know, because the information is not available yet or because the it has * failed to parse the Consumer Protocol, it returns true to be safe. */ - def isSubscribedToTopic(topic: String): Boolean = subscribedTopics match { -case Some(topics) => topics.contains(topic) -case None => true - } + def isSubscribedToTopic(topic: String): Boolean = subscribedTopics.forall(topics => topics.contains(topic)) Review Comment: How is this related to the PR description? Seems like an unrelated clean-up? Is your before/after test including these changes? Many of them _can_ result in additional allocations since you're replacing pattern matching with lambas. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #12967: MINOR: Replace ArrayBuffer with ListBuffer for better performance
ijuma commented on PR #12967: URL: https://github.com/apache/kafka/pull/12967#issuecomment-1343464276 > The data structure is converted to Java's List from scala. Why is this? Java's `ArrayList` is array backed, just like `ArrayBuffer` while `ListBuffer` is a linked list. Generally, linked lists are worse due to cache effects and `ArrayBuffer` has amoritzed constant append too. One case where `ListBuffer` is better is when `ArrayBuffer` usage results in unnecessary allocations (i.e. small lists). A simple way of solving that for `ArrayBuffer` is to size the underlying array to be smaller. I haven't looked at the results in detail yet, but I wanted to share this as I am a bit confused about the PR description. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator
hachikuji commented on code in PR #12862: URL: https://github.com/apache/kafka/pull/12862#discussion_r1043805938 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestState.java: ## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.utils.ExponentialBackoff; + +// Visible for testing Review Comment: We can get rid of these comments. A general-purpose javadoc would be good though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator
hachikuji commented on code in PR #12862: URL: https://github.com/apache/kafka/pull/12862#discussion_r1043805938 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestState.java: ## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.utils.ExponentialBackoff; + +// Visible for testing Review Comment: We can get rid of these comments -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12855: KAFKA-14367; Add `DescribeGroups` to the new `GroupCoordinator` interface
dajac commented on code in PR #12855: URL: https://github.com/apache/kafka/pull/12855#discussion_r1043802283 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -1571,53 +1571,47 @@ class KafkaApis(val requestChannel: RequestChannel, } } - def handleDescribeGroupRequest(request: RequestChannel.Request): Unit = { - -def sendResponseCallback(describeGroupsResponseData: DescribeGroupsResponseData): Unit = { - def createResponse(requestThrottleMs: Int): AbstractResponse = { -describeGroupsResponseData.setThrottleTimeMs(requestThrottleMs) -new DescribeGroupsResponse(describeGroupsResponseData) - } - requestHelper.sendResponseMaybeThrottle(request, createResponse) -} - + def handleDescribeGroupsRequest(request: RequestChannel.Request): CompletableFuture[Unit] = { val describeRequest = request.body[DescribeGroupsRequest] -val describeGroupsResponseData = new DescribeGroupsResponseData() +val includeAuthorizedOperations = describeRequest.data.includeAuthorizedOperations +val futures = new mutable.ArrayBuffer[CompletableFuture[DescribeGroupsResponseData.DescribedGroup]]( + describeRequest.data.groups.size +) describeRequest.data.groups.forEach { groupId => if (!authHelper.authorize(request.context, DESCRIBE, GROUP, groupId)) { - describeGroupsResponseData.groups.add(DescribeGroupsResponse.forError(groupId, Errors.GROUP_AUTHORIZATION_FAILED)) +futures += CompletableFuture.completedFuture(DescribeGroupsResponse.forError( + groupId, + Errors.GROUP_AUTHORIZATION_FAILED +)) } else { -val (error, summary) = groupCoordinator.handleDescribeGroup(groupId) -val members = summary.members.map { member => - new DescribeGroupsResponseData.DescribedGroupMember() -.setMemberId(member.memberId) -.setGroupInstanceId(member.groupInstanceId.orNull) -.setClientId(member.clientId) -.setClientHost(member.clientHost) -.setMemberAssignment(member.assignment) -.setMemberMetadata(member.metadata) -} - -val describedGroup = new DescribeGroupsResponseData.DescribedGroup() - .setErrorCode(error.code) - .setGroupId(groupId) - .setGroupState(summary.state) - .setProtocolType(summary.protocolType) - .setProtocolData(summary.protocol) - .setMembers(members.asJava) - -if (request.header.apiVersion >= 3) { - if (error == Errors.NONE && describeRequest.data.includeAuthorizedOperations) { - describedGroup.setAuthorizedOperations(authHelper.authorizedOperations(request, new Resource(ResourceType.GROUP, groupId))) +futures += newGroupCoordinator.describeGroup( + request.context, + groupId +).handle[DescribeGroupsResponseData.DescribedGroup] { (response, exception) => + if (exception != null) { +DescribeGroupsResponse.forError(groupId, Errors.forException(exception)) Review Comment: I did the refactor separately: https://github.com/apache/kafka/pull/12970. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac opened a new pull request, #12970: MINOR: Small refactor in DescribeGroupsResponse
dajac opened a new pull request, #12970: URL: https://github.com/apache/kafka/pull/12970 This patch does a few cleanups: * It removes `DescribeGroupsResponse.fromError` and pushes its logic to `DescribeGroupsRequest.getErrorResponse` to be consistent with how we implemented the other requests/responses. * It renames `DescribedGroup.forError` to `DescribedGroup.groupError`. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #12862: Consumer refator find coordinator
philipnee commented on code in PR #12862: URL: https://github.com/apache/kafka/pull/12862#discussion_r1043793970 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java: ## @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.message.FindCoordinatorRequestData; +import org.apache.kafka.common.message.FindCoordinatorResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.FindCoordinatorRequest; +import org.apache.kafka.common.requests.FindCoordinatorResponse; +import org.apache.kafka.common.utils.ExponentialBackoff; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Objects; +import java.util.Optional; + +/** + * Handles the timing of the next FindCoordinatorRequest based on the {@link CoordinatorRequestState}. It checks for: + * 1. If there's an existing coordinator. + * 2. If there is an inflight request + * 3. If the backoff timer has expired + * + * The {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} contains either a wait + * timer, or a singleton list of + * {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}. + * + * The FindCoordinatorResponse will be handled by the {@link FindCoordinatorRequestHandler} callback, which + * subsequently invokes {@code onResponse} to handle the exceptions and responses. Note that, the coordinator node + * will be marked {@code null} upon receiving a failure. + */ +public class CoordinatorRequestManager implements RequestManager { + +private final Logger log; +private final Time time; +private final ErrorEventHandler errorHandler; +private final long rebalanceTimeoutMs; +private final String groupId; + +private final CoordinatorRequestState coordinatorRequestState; +private long timeMarkedUnknownMs = -1L; // starting logging a warning only after unable to connect for a while +private Node coordinator; + + +public CoordinatorRequestManager(final Time time, + final LogContext logContext, + final ConsumerConfig config, + final ErrorEventHandler errorHandler, + final String groupId, + final long rebalanceTimeoutMs) { +Objects.requireNonNull(groupId); +this.time = time; +this.log = logContext.logger(this.getClass()); +this.errorHandler = errorHandler; +this.groupId = groupId; +this.rebalanceTimeoutMs = rebalanceTimeoutMs; +this.coordinatorRequestState = new CoordinatorRequestState(config); +} + +// Visible for testing +CoordinatorRequestManager(final Time time, + final LogContext logContext, + final ErrorEventHandler errorHandler, + final String groupId, + final long rebalanceTimeoutMs, + final CoordinatorRequestState coordinatorRequestState) { +Objects.requireNonNull(groupId); +this.time = time; +this.log = logContext.logger(this.getClass()); +this.errorHandler = errorHandler; +this.groupId = groupId; +this.rebalanceTimeoutMs = rebalanceTimeoutMs; +this.coordinatorRequestState = coordinatorRequestState; +} + +@Override +public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { +if (this.coordinator != null) { +return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, Collections.emptyList()); +
[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator
hachikuji commented on code in PR #12862: URL: https://github.com/apache/kafka/pull/12862#discussion_r1043787431 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java: ## @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.message.FindCoordinatorRequestData; +import org.apache.kafka.common.message.FindCoordinatorResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.FindCoordinatorRequest; +import org.apache.kafka.common.requests.FindCoordinatorResponse; +import org.apache.kafka.common.utils.ExponentialBackoff; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Objects; +import java.util.Optional; + +/** + * Handles the timing of the next FindCoordinatorRequest based on the {@link CoordinatorRequestState}. It checks for: + * 1. If there's an existing coordinator. + * 2. If there is an inflight request + * 3. If the backoff timer has expired + * + * The {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} contains either a wait + * timer, or a singleton list of + * {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}. + * + * The FindCoordinatorResponse will be handled by the {@link FindCoordinatorRequestHandler} callback, which + * subsequently invokes {@code onResponse} to handle the exceptions and responses. Note that, the coordinator node + * will be marked {@code null} upon receiving a failure. + */ +public class CoordinatorRequestManager implements RequestManager { + +private final Logger log; +private final Time time; +private final ErrorEventHandler errorHandler; +private final long rebalanceTimeoutMs; +private final String groupId; + +private final CoordinatorRequestState coordinatorRequestState; +private long timeMarkedUnknownMs = -1L; // starting logging a warning only after unable to connect for a while +private Node coordinator; + + +public CoordinatorRequestManager(final Time time, + final LogContext logContext, + final ConsumerConfig config, + final ErrorEventHandler errorHandler, + final String groupId, + final long rebalanceTimeoutMs) { +Objects.requireNonNull(groupId); +this.time = time; +this.log = logContext.logger(this.getClass()); +this.errorHandler = errorHandler; +this.groupId = groupId; +this.rebalanceTimeoutMs = rebalanceTimeoutMs; +this.coordinatorRequestState = new CoordinatorRequestState(config); +} + +// Visible for testing +CoordinatorRequestManager(final Time time, + final LogContext logContext, + final ErrorEventHandler errorHandler, + final String groupId, + final long rebalanceTimeoutMs, + final CoordinatorRequestState coordinatorRequestState) { +Objects.requireNonNull(groupId); +this.time = time; +this.log = logContext.logger(this.getClass()); +this.errorHandler = errorHandler; +this.groupId = groupId; +this.rebalanceTimeoutMs = rebalanceTimeoutMs; +this.coordinatorRequestState = coordinatorRequestState; +} + +@Override +public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { +if (this.coordinator != null) { +return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, Collections.emptyList()); +
[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator
hachikuji commented on code in PR #12862: URL: https://github.com/apache/kafka/pull/12862#discussion_r1043785411 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java: ## @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.message.FindCoordinatorRequestData; +import org.apache.kafka.common.message.FindCoordinatorResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.FindCoordinatorRequest; +import org.apache.kafka.common.requests.FindCoordinatorResponse; +import org.apache.kafka.common.utils.ExponentialBackoff; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Objects; +import java.util.Optional; + +/** + * Handles the timing of the next FindCoordinatorRequest based on the {@link CoordinatorRequestState}. It checks for: + * 1. If there's an existing coordinator. + * 2. If there is an inflight request + * 3. If the backoff timer has expired + * + * The {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} contains either a wait + * timer, or a singleton list of + * {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}. + * + * The FindCoordinatorResponse will be handled by the {@link FindCoordinatorRequestHandler} callback, which + * subsequently invokes {@code onResponse} to handle the exceptions and responses. Note that, the coordinator node + * will be marked {@code null} upon receiving a failure. + */ +public class CoordinatorRequestManager implements RequestManager { + +private final Logger log; +private final Time time; +private final ErrorEventHandler errorHandler; +private final long rebalanceTimeoutMs; +private final String groupId; + +private final CoordinatorRequestState coordinatorRequestState; +private long timeMarkedUnknownMs = -1L; // starting logging a warning only after unable to connect for a while +private Node coordinator; + + +public CoordinatorRequestManager(final Time time, + final LogContext logContext, + final ConsumerConfig config, + final ErrorEventHandler errorHandler, + final String groupId, + final long rebalanceTimeoutMs) { +Objects.requireNonNull(groupId); +this.time = time; +this.log = logContext.logger(this.getClass()); +this.errorHandler = errorHandler; +this.groupId = groupId; +this.rebalanceTimeoutMs = rebalanceTimeoutMs; +this.coordinatorRequestState = new CoordinatorRequestState(config); +} + +// Visible for testing +CoordinatorRequestManager(final Time time, + final LogContext logContext, + final ErrorEventHandler errorHandler, + final String groupId, + final long rebalanceTimeoutMs, + final CoordinatorRequestState coordinatorRequestState) { +Objects.requireNonNull(groupId); +this.time = time; +this.log = logContext.logger(this.getClass()); +this.errorHandler = errorHandler; +this.groupId = groupId; +this.rebalanceTimeoutMs = rebalanceTimeoutMs; +this.coordinatorRequestState = coordinatorRequestState; +} + +@Override +public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { +if (this.coordinator != null) { +return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, Collections.emptyList()); +
[GitHub] [kafka] dajac commented on a diff in pull request #12855: KAFKA-14367; Add `DescribeGroups` to the new `GroupCoordinator` interface
dajac commented on code in PR #12855: URL: https://github.com/apache/kafka/pull/12855#discussion_r1043782746 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -1571,53 +1571,47 @@ class KafkaApis(val requestChannel: RequestChannel, } } - def handleDescribeGroupRequest(request: RequestChannel.Request): Unit = { - -def sendResponseCallback(describeGroupsResponseData: DescribeGroupsResponseData): Unit = { - def createResponse(requestThrottleMs: Int): AbstractResponse = { -describeGroupsResponseData.setThrottleTimeMs(requestThrottleMs) -new DescribeGroupsResponse(describeGroupsResponseData) - } - requestHelper.sendResponseMaybeThrottle(request, createResponse) -} - + def handleDescribeGroupsRequest(request: RequestChannel.Request): CompletableFuture[Unit] = { val describeRequest = request.body[DescribeGroupsRequest] -val describeGroupsResponseData = new DescribeGroupsResponseData() +val includeAuthorizedOperations = describeRequest.data.includeAuthorizedOperations +val futures = new mutable.ArrayBuffer[CompletableFuture[DescribeGroupsResponseData.DescribedGroup]]( + describeRequest.data.groups.size +) describeRequest.data.groups.forEach { groupId => if (!authHelper.authorize(request.context, DESCRIBE, GROUP, groupId)) { - describeGroupsResponseData.groups.add(DescribeGroupsResponse.forError(groupId, Errors.GROUP_AUTHORIZATION_FAILED)) +futures += CompletableFuture.completedFuture(DescribeGroupsResponse.forError( + groupId, + Errors.GROUP_AUTHORIZATION_FAILED +)) } else { -val (error, summary) = groupCoordinator.handleDescribeGroup(groupId) -val members = summary.members.map { member => - new DescribeGroupsResponseData.DescribedGroupMember() -.setMemberId(member.memberId) -.setGroupInstanceId(member.groupInstanceId.orNull) -.setClientId(member.clientId) -.setClientHost(member.clientHost) -.setMemberAssignment(member.assignment) -.setMemberMetadata(member.metadata) -} - -val describedGroup = new DescribeGroupsResponseData.DescribedGroup() - .setErrorCode(error.code) - .setGroupId(groupId) - .setGroupState(summary.state) - .setProtocolType(summary.protocolType) - .setProtocolData(summary.protocol) - .setMembers(members.asJava) - -if (request.header.apiVersion >= 3) { - if (error == Errors.NONE && describeRequest.data.includeAuthorizedOperations) { - describedGroup.setAuthorizedOperations(authHelper.authorizedOperations(request, new Resource(ResourceType.GROUP, groupId))) +futures += newGroupCoordinator.describeGroup( Review Comment: Yeah, I think that I am with you on this point. Keeping `KafkaApis` simple makes sense. Let me give it a shot tomorrow. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12855: KAFKA-14367; Add `DescribeGroups` to the new `GroupCoordinator` interface
hachikuji commented on code in PR #12855: URL: https://github.com/apache/kafka/pull/12855#discussion_r1043778147 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -1571,53 +1571,47 @@ class KafkaApis(val requestChannel: RequestChannel, } } - def handleDescribeGroupRequest(request: RequestChannel.Request): Unit = { - -def sendResponseCallback(describeGroupsResponseData: DescribeGroupsResponseData): Unit = { - def createResponse(requestThrottleMs: Int): AbstractResponse = { -describeGroupsResponseData.setThrottleTimeMs(requestThrottleMs) -new DescribeGroupsResponse(describeGroupsResponseData) - } - requestHelper.sendResponseMaybeThrottle(request, createResponse) -} - + def handleDescribeGroupsRequest(request: RequestChannel.Request): CompletableFuture[Unit] = { val describeRequest = request.body[DescribeGroupsRequest] -val describeGroupsResponseData = new DescribeGroupsResponseData() +val includeAuthorizedOperations = describeRequest.data.includeAuthorizedOperations +val futures = new mutable.ArrayBuffer[CompletableFuture[DescribeGroupsResponseData.DescribedGroup]]( + describeRequest.data.groups.size +) describeRequest.data.groups.forEach { groupId => if (!authHelper.authorize(request.context, DESCRIBE, GROUP, groupId)) { - describeGroupsResponseData.groups.add(DescribeGroupsResponse.forError(groupId, Errors.GROUP_AUTHORIZATION_FAILED)) +futures += CompletableFuture.completedFuture(DescribeGroupsResponse.forError( + groupId, + Errors.GROUP_AUTHORIZATION_FAILED +)) } else { -val (error, summary) = groupCoordinator.handleDescribeGroup(groupId) -val members = summary.members.map { member => - new DescribeGroupsResponseData.DescribedGroupMember() -.setMemberId(member.memberId) -.setGroupInstanceId(member.groupInstanceId.orNull) -.setClientId(member.clientId) -.setClientHost(member.clientHost) -.setMemberAssignment(member.assignment) -.setMemberMetadata(member.metadata) -} - -val describedGroup = new DescribeGroupsResponseData.DescribedGroup() - .setErrorCode(error.code) - .setGroupId(groupId) - .setGroupState(summary.state) - .setProtocolType(summary.protocolType) - .setProtocolData(summary.protocol) - .setMembers(members.asJava) - -if (request.header.apiVersion >= 3) { - if (error == Errors.NONE && describeRequest.data.includeAuthorizedOperations) { - describedGroup.setAuthorizedOperations(authHelper.authorizedOperations(request, new Resource(ResourceType.GROUP, groupId))) +futures += newGroupCoordinator.describeGroup( Review Comment: I don't have a strong opinion, but I slightly prefer keeping `KafkaApis` simple and having the complexity in the coordinator implementation. I also wonder if that makes optimization easier. For example, even if we have fanout to the respective coordinators, it may still be the case that multiple groups are handled by the same coordinator. Perhaps that would mean fewer messages on the respective queues? Not sure. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac opened a new pull request, #12969: MINOR: Small refactor in ApiVersionManager
dajac opened a new pull request, #12969: URL: https://github.com/apache/kafka/pull/12969 For KIP-848, I would like to check-in the new APIs but without exposing them while the KIP is in development. At first, my goal is to gate them by an internal configuration flag defined in KafkaConfig. Later one, this should be dynamic either based on MetadataVersion or another feature flag. This patch is a small refactor in `ApiVersionManager` to allow filtering APIs. The basic idea is to filter them based on KafkaConfig [here](https://github.com/apache/kafka/compare/trunk...dajac:kafka:minor-refacor-apiversion-manager?expand=1#diff-b89d6d5e31fa117d2dfb7d242c5035c18556c783f58360ca3b42801100640090R44) so instead of letting the `ApiVersionManager` determine the available APIs based on the listener type, we give it the allowed set. This is definitely not the end state that I want but is a first step towards it. It will at least allow me to get this APIs checked in safely. We can come back to the dynamic part of it a bit later. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan opened a new pull request, #12968: KAFKA-14417:Addressing the error server side
jolshan opened a new pull request, #12968: URL: https://github.com/apache/kafka/pull/12968 After some back and forth, it seems best to return an alternative error code old clients can handle. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12855: KAFKA-14367; Add `DescribeGroups` to the new `GroupCoordinator` interface
hachikuji commented on code in PR #12855: URL: https://github.com/apache/kafka/pull/12855#discussion_r1043769009 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -1571,53 +1571,47 @@ class KafkaApis(val requestChannel: RequestChannel, } } - def handleDescribeGroupRequest(request: RequestChannel.Request): Unit = { - -def sendResponseCallback(describeGroupsResponseData: DescribeGroupsResponseData): Unit = { - def createResponse(requestThrottleMs: Int): AbstractResponse = { -describeGroupsResponseData.setThrottleTimeMs(requestThrottleMs) -new DescribeGroupsResponse(describeGroupsResponseData) - } - requestHelper.sendResponseMaybeThrottle(request, createResponse) -} - + def handleDescribeGroupsRequest(request: RequestChannel.Request): CompletableFuture[Unit] = { val describeRequest = request.body[DescribeGroupsRequest] -val describeGroupsResponseData = new DescribeGroupsResponseData() +val includeAuthorizedOperations = describeRequest.data.includeAuthorizedOperations +val futures = new mutable.ArrayBuffer[CompletableFuture[DescribeGroupsResponseData.DescribedGroup]]( + describeRequest.data.groups.size +) describeRequest.data.groups.forEach { groupId => if (!authHelper.authorize(request.context, DESCRIBE, GROUP, groupId)) { - describeGroupsResponseData.groups.add(DescribeGroupsResponse.forError(groupId, Errors.GROUP_AUTHORIZATION_FAILED)) +futures += CompletableFuture.completedFuture(DescribeGroupsResponse.forError( + groupId, + Errors.GROUP_AUTHORIZATION_FAILED +)) } else { -val (error, summary) = groupCoordinator.handleDescribeGroup(groupId) -val members = summary.members.map { member => - new DescribeGroupsResponseData.DescribedGroupMember() -.setMemberId(member.memberId) -.setGroupInstanceId(member.groupInstanceId.orNull) -.setClientId(member.clientId) -.setClientHost(member.clientHost) -.setMemberAssignment(member.assignment) -.setMemberMetadata(member.metadata) -} - -val describedGroup = new DescribeGroupsResponseData.DescribedGroup() - .setErrorCode(error.code) - .setGroupId(groupId) - .setGroupState(summary.state) - .setProtocolType(summary.protocolType) - .setProtocolData(summary.protocol) - .setMembers(members.asJava) - -if (request.header.apiVersion >= 3) { - if (error == Errors.NONE && describeRequest.data.includeAuthorizedOperations) { - describedGroup.setAuthorizedOperations(authHelper.authorizedOperations(request, new Resource(ResourceType.GROUP, groupId))) +futures += newGroupCoordinator.describeGroup( + request.context, + groupId +).handle[DescribeGroupsResponseData.DescribedGroup] { (response, exception) => + if (exception != null) { +DescribeGroupsResponse.forError(groupId, Errors.forException(exception)) Review Comment: Yeah, I was wondering why we didn't use `getErrorResponse`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on pull request #12913: KIP-866 Allow ZK brokers to register in KRaft
mumrah commented on PR #12913: URL: https://github.com/apache/kafka/pull/12913#issuecomment-1343283462 @mimaison here's the new PR https://github.com/apache/kafka/pull/12965 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #12935: KAFKA-14432: RocksDBStore relies on finalizers to not leak memory
cadonna commented on PR #12935: URL: https://github.com/apache/kafka/pull/12935#issuecomment-1343280080 Backported to 3.3, 3.2, 3.1, and 3.0. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac merged pull request #12932: KAFKA-14425; The Kafka protocol should support nullable structs
dajac merged PR #12932: URL: https://github.com/apache/kafka/pull/12932 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #12932: KAFKA-14425; The Kafka protocol should support nullable structs
dajac commented on PR #12932: URL: https://github.com/apache/kafka/pull/12932#issuecomment-1343231442 The KIP has been accepted. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12855: KAFKA-14367; Add `DescribeGroups` to the new `GroupCoordinator` interface
dajac commented on code in PR #12855: URL: https://github.com/apache/kafka/pull/12855#discussion_r1043727578 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -1571,53 +1571,47 @@ class KafkaApis(val requestChannel: RequestChannel, } } - def handleDescribeGroupRequest(request: RequestChannel.Request): Unit = { - -def sendResponseCallback(describeGroupsResponseData: DescribeGroupsResponseData): Unit = { - def createResponse(requestThrottleMs: Int): AbstractResponse = { -describeGroupsResponseData.setThrottleTimeMs(requestThrottleMs) -new DescribeGroupsResponse(describeGroupsResponseData) - } - requestHelper.sendResponseMaybeThrottle(request, createResponse) -} - + def handleDescribeGroupsRequest(request: RequestChannel.Request): CompletableFuture[Unit] = { val describeRequest = request.body[DescribeGroupsRequest] -val describeGroupsResponseData = new DescribeGroupsResponseData() +val includeAuthorizedOperations = describeRequest.data.includeAuthorizedOperations +val futures = new mutable.ArrayBuffer[CompletableFuture[DescribeGroupsResponseData.DescribedGroup]]( + describeRequest.data.groups.size +) describeRequest.data.groups.forEach { groupId => if (!authHelper.authorize(request.context, DESCRIBE, GROUP, groupId)) { - describeGroupsResponseData.groups.add(DescribeGroupsResponse.forError(groupId, Errors.GROUP_AUTHORIZATION_FAILED)) +futures += CompletableFuture.completedFuture(DescribeGroupsResponse.forError( + groupId, + Errors.GROUP_AUTHORIZATION_FAILED +)) } else { -val (error, summary) = groupCoordinator.handleDescribeGroup(groupId) -val members = summary.members.map { member => - new DescribeGroupsResponseData.DescribedGroupMember() -.setMemberId(member.memberId) -.setGroupInstanceId(member.groupInstanceId.orNull) -.setClientId(member.clientId) -.setClientHost(member.clientHost) -.setMemberAssignment(member.assignment) -.setMemberMetadata(member.metadata) -} - -val describedGroup = new DescribeGroupsResponseData.DescribedGroup() - .setErrorCode(error.code) - .setGroupId(groupId) - .setGroupState(summary.state) - .setProtocolType(summary.protocolType) - .setProtocolData(summary.protocol) - .setMembers(members.asJava) - -if (request.header.apiVersion >= 3) { - if (error == Errors.NONE && describeRequest.data.includeAuthorizedOperations) { - describedGroup.setAuthorizedOperations(authHelper.authorizedOperations(request, new Resource(ResourceType.GROUP, groupId))) +futures += newGroupCoordinator.describeGroup( + request.context, + groupId +).handle[DescribeGroupsResponseData.DescribedGroup] { (response, exception) => + if (exception != null) { +DescribeGroupsResponse.forError(groupId, Errors.forException(exception)) Review Comment: Sure. Let me clean this. I wonder if we should also remove `fromError` and push that code to `DescribeGroupsRequest.getErrorResponse` in order to be consistent with the other requests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12855: KAFKA-14367; Add `DescribeGroups` to the new `GroupCoordinator` interface
dajac commented on code in PR #12855: URL: https://github.com/apache/kafka/pull/12855#discussion_r1043725962 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -1571,53 +1571,47 @@ class KafkaApis(val requestChannel: RequestChannel, } } - def handleDescribeGroupRequest(request: RequestChannel.Request): Unit = { - -def sendResponseCallback(describeGroupsResponseData: DescribeGroupsResponseData): Unit = { - def createResponse(requestThrottleMs: Int): AbstractResponse = { -describeGroupsResponseData.setThrottleTimeMs(requestThrottleMs) -new DescribeGroupsResponse(describeGroupsResponseData) - } - requestHelper.sendResponseMaybeThrottle(request, createResponse) -} - + def handleDescribeGroupsRequest(request: RequestChannel.Request): CompletableFuture[Unit] = { val describeRequest = request.body[DescribeGroupsRequest] -val describeGroupsResponseData = new DescribeGroupsResponseData() +val includeAuthorizedOperations = describeRequest.data.includeAuthorizedOperations +val futures = new mutable.ArrayBuffer[CompletableFuture[DescribeGroupsResponseData.DescribedGroup]]( + describeRequest.data.groups.size +) describeRequest.data.groups.forEach { groupId => if (!authHelper.authorize(request.context, DESCRIBE, GROUP, groupId)) { - describeGroupsResponseData.groups.add(DescribeGroupsResponse.forError(groupId, Errors.GROUP_AUTHORIZATION_FAILED)) +futures += CompletableFuture.completedFuture(DescribeGroupsResponse.forError( + groupId, + Errors.GROUP_AUTHORIZATION_FAILED +)) } else { -val (error, summary) = groupCoordinator.handleDescribeGroup(groupId) -val members = summary.members.map { member => - new DescribeGroupsResponseData.DescribedGroupMember() -.setMemberId(member.memberId) -.setGroupInstanceId(member.groupInstanceId.orNull) -.setClientId(member.clientId) -.setClientHost(member.clientHost) -.setMemberAssignment(member.assignment) -.setMemberMetadata(member.metadata) -} - -val describedGroup = new DescribeGroupsResponseData.DescribedGroup() - .setErrorCode(error.code) - .setGroupId(groupId) - .setGroupState(summary.state) - .setProtocolType(summary.protocolType) - .setProtocolData(summary.protocol) - .setMembers(members.asJava) - -if (request.header.apiVersion >= 3) { - if (error == Errors.NONE && describeRequest.data.includeAuthorizedOperations) { - describedGroup.setAuthorizedOperations(authHelper.authorizedOperations(request, new Resource(ResourceType.GROUP, groupId))) +futures += newGroupCoordinator.describeGroup( Review Comment: There is not particular reason, I guess. @hachikuji The reason I kept it like this is that we will need to fan out any way with the new controller as we will have multiple event loops. I suppose that we could say that this is an internal implementation details so not exposing it in the interface would make sense. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12855: KAFKA-14367; Add `DescribeGroups` to the new `GroupCoordinator` interface
jolshan commented on code in PR #12855: URL: https://github.com/apache/kafka/pull/12855#discussion_r1043708573 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -1571,53 +1571,47 @@ class KafkaApis(val requestChannel: RequestChannel, } } - def handleDescribeGroupRequest(request: RequestChannel.Request): Unit = { - -def sendResponseCallback(describeGroupsResponseData: DescribeGroupsResponseData): Unit = { - def createResponse(requestThrottleMs: Int): AbstractResponse = { -describeGroupsResponseData.setThrottleTimeMs(requestThrottleMs) -new DescribeGroupsResponse(describeGroupsResponseData) - } - requestHelper.sendResponseMaybeThrottle(request, createResponse) -} - + def handleDescribeGroupsRequest(request: RequestChannel.Request): CompletableFuture[Unit] = { val describeRequest = request.body[DescribeGroupsRequest] -val describeGroupsResponseData = new DescribeGroupsResponseData() +val includeAuthorizedOperations = describeRequest.data.includeAuthorizedOperations +val futures = new mutable.ArrayBuffer[CompletableFuture[DescribeGroupsResponseData.DescribedGroup]]( + describeRequest.data.groups.size +) describeRequest.data.groups.forEach { groupId => if (!authHelper.authorize(request.context, DESCRIBE, GROUP, groupId)) { - describeGroupsResponseData.groups.add(DescribeGroupsResponse.forError(groupId, Errors.GROUP_AUTHORIZATION_FAILED)) +futures += CompletableFuture.completedFuture(DescribeGroupsResponse.forError( + groupId, + Errors.GROUP_AUTHORIZATION_FAILED +)) } else { -val (error, summary) = groupCoordinator.handleDescribeGroup(groupId) -val members = summary.members.map { member => - new DescribeGroupsResponseData.DescribedGroupMember() -.setMemberId(member.memberId) -.setGroupInstanceId(member.groupInstanceId.orNull) -.setClientId(member.clientId) -.setClientHost(member.clientHost) -.setMemberAssignment(member.assignment) -.setMemberMetadata(member.metadata) -} - -val describedGroup = new DescribeGroupsResponseData.DescribedGroup() - .setErrorCode(error.code) - .setGroupId(groupId) - .setGroupState(summary.state) - .setProtocolType(summary.protocolType) - .setProtocolData(summary.protocol) - .setMembers(members.asJava) - -if (request.header.apiVersion >= 3) { - if (error == Errors.NONE && describeRequest.data.includeAuthorizedOperations) { - describedGroup.setAuthorizedOperations(authHelper.authorizedOperations(request, new Resource(ResourceType.GROUP, groupId))) +futures += newGroupCoordinator.describeGroup( Review Comment: Any idea why the original semantics were to handle each group one by one? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-14417) Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error
[ https://issues.apache.org/jira/browse/KAFKA-14417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-14417. - Fix Version/s: 4.0.0 3.3.2 Resolution: Fixed > Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats > as fatal error > -- > > Key: KAFKA-14417 > URL: https://issues.apache.org/jira/browse/KAFKA-14417 > Project: Kafka > Issue Type: Task >Affects Versions: 3.1.0, 3.0.0, 3.2.0, 3.3.0, 3.4.0 >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Blocker > Fix For: 4.0.0, 3.3.2 > > > In TransactionManager we have a handler for InitProducerIdRequests > [https://github.com/apache/kafka/blob/19286449ee20f85cc81860e13df14467d4ce287c/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#LL1276C14-L1276C14] > However, we have the potential to return a REQUEST_TIMED_OUT error in > RPCProducerIdManager when the BrokerToControllerChannel manager times out: > [https://github.com/apache/kafka/blob/19286449ee20f85cc81860e13df14467d4ce287c/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala#L236] > > or when the poll returns null: > [https://github.com/apache/kafka/blob/19286449ee20f85cc81860e13df14467d4ce287c/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala#L170] > Since REQUEST_TIMED_OUT is not handled by the producer, we treat it as a > fatal error and the producer fails. With the default of idempotent producers, > this can cause more issues. > See this stack trace from 3.0: > {code:java} > ERROR [Producer clientId=console-producer] Aborting producer batches due to > fatal error (org.apache.kafka.clients.producer.internals.Sender) > org.apache.kafka.common.KafkaException: Unexpected error in > InitProducerIdResponse; The request timed out. > at > org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1390) > at > org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1294) > at > org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) > at > org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:658) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:650) > at > org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:418) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:316) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:256) > at java.lang.Thread.run(Thread.java:748) > {code} > Seems like the commit that introduced the changes was this one: > [https://github.com/apache/kafka/commit/72d108274c98dca44514007254552481c731c958] > so we are vulnerable when the server code is ibp 3.0 and beyond. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] hachikuji commented on a diff in pull request #12855: KAFKA-14367; Add `DescribeGroups` to the new `GroupCoordinator` interface
hachikuji commented on code in PR #12855: URL: https://github.com/apache/kafka/pull/12855#discussion_r1043700608 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -1571,53 +1571,47 @@ class KafkaApis(val requestChannel: RequestChannel, } } - def handleDescribeGroupRequest(request: RequestChannel.Request): Unit = { - -def sendResponseCallback(describeGroupsResponseData: DescribeGroupsResponseData): Unit = { - def createResponse(requestThrottleMs: Int): AbstractResponse = { -describeGroupsResponseData.setThrottleTimeMs(requestThrottleMs) -new DescribeGroupsResponse(describeGroupsResponseData) - } - requestHelper.sendResponseMaybeThrottle(request, createResponse) -} - + def handleDescribeGroupsRequest(request: RequestChannel.Request): CompletableFuture[Unit] = { val describeRequest = request.body[DescribeGroupsRequest] -val describeGroupsResponseData = new DescribeGroupsResponseData() +val includeAuthorizedOperations = describeRequest.data.includeAuthorizedOperations +val futures = new mutable.ArrayBuffer[CompletableFuture[DescribeGroupsResponseData.DescribedGroup]]( + describeRequest.data.groups.size +) describeRequest.data.groups.forEach { groupId => if (!authHelper.authorize(request.context, DESCRIBE, GROUP, groupId)) { - describeGroupsResponseData.groups.add(DescribeGroupsResponse.forError(groupId, Errors.GROUP_AUTHORIZATION_FAILED)) +futures += CompletableFuture.completedFuture(DescribeGroupsResponse.forError( + groupId, + Errors.GROUP_AUTHORIZATION_FAILED +)) } else { -val (error, summary) = groupCoordinator.handleDescribeGroup(groupId) -val members = summary.members.map { member => - new DescribeGroupsResponseData.DescribedGroupMember() -.setMemberId(member.memberId) -.setGroupInstanceId(member.groupInstanceId.orNull) -.setClientId(member.clientId) -.setClientHost(member.clientHost) -.setMemberAssignment(member.assignment) -.setMemberMetadata(member.metadata) -} - -val describedGroup = new DescribeGroupsResponseData.DescribedGroup() - .setErrorCode(error.code) - .setGroupId(groupId) - .setGroupState(summary.state) - .setProtocolType(summary.protocolType) - .setProtocolData(summary.protocol) - .setMembers(members.asJava) - -if (request.header.apiVersion >= 3) { - if (error == Errors.NONE && describeRequest.data.includeAuthorizedOperations) { - describedGroup.setAuthorizedOperations(authHelper.authorizedOperations(request, new Resource(ResourceType.GROUP, groupId))) +futures += newGroupCoordinator.describeGroup( Review Comment: I wonder if it would be simpler to pass through all groups that we want to describe. Then we just have a single future. ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -1571,53 +1571,47 @@ class KafkaApis(val requestChannel: RequestChannel, } } - def handleDescribeGroupRequest(request: RequestChannel.Request): Unit = { - -def sendResponseCallback(describeGroupsResponseData: DescribeGroupsResponseData): Unit = { - def createResponse(requestThrottleMs: Int): AbstractResponse = { -describeGroupsResponseData.setThrottleTimeMs(requestThrottleMs) -new DescribeGroupsResponse(describeGroupsResponseData) - } - requestHelper.sendResponseMaybeThrottle(request, createResponse) -} - + def handleDescribeGroupsRequest(request: RequestChannel.Request): CompletableFuture[Unit] = { val describeRequest = request.body[DescribeGroupsRequest] -val describeGroupsResponseData = new DescribeGroupsResponseData() +val includeAuthorizedOperations = describeRequest.data.includeAuthorizedOperations +val futures = new mutable.ArrayBuffer[CompletableFuture[DescribeGroupsResponseData.DescribedGroup]]( + describeRequest.data.groups.size +) describeRequest.data.groups.forEach { groupId => if (!authHelper.authorize(request.context, DESCRIBE, GROUP, groupId)) { - describeGroupsResponseData.groups.add(DescribeGroupsResponse.forError(groupId, Errors.GROUP_AUTHORIZATION_FAILED)) +futures += CompletableFuture.completedFuture(DescribeGroupsResponse.forError( + groupId, + Errors.GROUP_AUTHORIZATION_FAILED +)) } else { -val (error, summary) = groupCoordinator.handleDescribeGroup(groupId) -val members = summary.members.map { member => - new DescribeGroupsResponseData.DescribedGroupMember() -.setMemberId(member.memberId) -.setGroupInstanceId(member.groupInstanceId.orNull) -.setClientId(member.clientId) -.setClientHost(member.clientHost) -.setMemberAssignment(member.assignment) -
[GitHub] [kafka] mumrah commented on a diff in pull request #12965: KAFKA-14448 Let ZK brokers register with KRaft controller
mumrah commented on code in PR #12965: URL: https://github.com/apache/kafka/pull/12965#discussion_r1043473436 ## core/src/main/scala/kafka/server/KafkaServer.scala: ## @@ -81,6 +84,19 @@ object KafkaServer { clientConfig } + def zkClient(name: String, time: Time, config: KafkaConfig, zkClientConfig: ZKClientConfig): KafkaZkClient = { Review Comment: This was meant to go in with #12946. It's a small refactor to make it easy to create a KafkaZkClient from ControllerServer (which is TBD) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12963: MINOR: More consistent handling of snapshot IDs
hachikuji commented on code in PR #12963: URL: https://github.com/apache/kafka/pull/12963#discussion_r1043680891 ## core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala: ## @@ -166,15 +172,15 @@ class BrokerMetadataListenerTest { image = newImage } -override def publishedOffset: Long = -1 +override def publishedEndOffset: Long = -1 } private val FOO_ID = Uuid.fromString("jj1G9utnTuCegi_gpnRgYw") private val BAR_ID = Uuid.fromString("SzN5j0LvSEaRIJHrxfMAlg") private def generateManyRecords(listener: BrokerMetadataListener, endOffset: Long): Unit = { -(0 to 1).foreach { _ => +(0 until 1).foreach { _ => listener.handleCommit( RecordTestUtils.mockBatchReader( endOffset, Review Comment: Are you referring to the removing replicas changes here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12963: MINOR: More consistent handling of snapshot IDs
hachikuji commented on code in PR #12963: URL: https://github.com/apache/kafka/pull/12963#discussion_r1043679688 ## core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala: ## @@ -166,15 +172,15 @@ class BrokerMetadataListenerTest { image = newImage } -override def publishedOffset: Long = -1 +override def publishedEndOffset: Long = -1 } private val FOO_ID = Uuid.fromString("jj1G9utnTuCegi_gpnRgYw") private val BAR_ID = Uuid.fromString("SzN5j0LvSEaRIJHrxfMAlg") private def generateManyRecords(listener: BrokerMetadataListener, endOffset: Long): Unit = { Review Comment: I changed the method here to make `endOffset` exclusive, so it seems ok now? Or am I missing something? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah merged pull request #12946: KAFKA-14427 ZK client support for migrations
mumrah merged PR #12946: URL: https://github.com/apache/kafka/pull/12946 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #12964: MINOR: Introduce MetadataProvenance and ImageReWriter
cmccabe commented on code in PR #12964: URL: https://github.com/apache/kafka/pull/12964#discussion_r1043673991 ## metadata/src/main/java/org/apache/kafka/image/MetadataProvenance.java: ## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image; + +import org.apache.kafka.raft.OffsetAndEpoch; + +import java.util.Objects; + + +/** + * Information about the source of a metadata image. + */ +public final class MetadataProvenance { +public static final MetadataProvenance EMPTY = new MetadataProvenance(-1L, 0, 0L); + +private final long offset; +private final int epoch; +private final long lastContainedLogTimeMs; + +public MetadataProvenance( +long offset, +int epoch, +long lastContainedLogTimeMs +) { +this.offset = offset; +this.epoch = epoch; +this.lastContainedLogTimeMs = lastContainedLogTimeMs; +} + +public OffsetAndEpoch offsetAndEpoch() { +return new OffsetAndEpoch(offset, epoch); +} + +public long offset() { +return offset; +} + +public int epoch() { +return epoch; +} + +public long lastContainedLogTimeMs() { +return lastContainedLogTimeMs; +} + +public String snapshotName() { Review Comment: It is used in the new metadata loader, which is not part of this PR. It is the name that a snapshot with the given provenance (offset, epoch) would have. I will add a Javadoc comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on pull request #12946: KAFKA-14427 ZK client support for migrations
mumrah commented on PR #12946: URL: https://github.com/apache/kafka/pull/12946#issuecomment-1343113991 Latest build looks good. A few unrelated flaky tests: ``` Build / JDK 8 and Scala 2.12 / org.apache.kafka.clients.consumer.CooperativeStickyAssignorTest.testLargeAssignmentAndGroupWithNonEqualSubscription() | 1 min 2 sec | 1 Build / JDK 8 and Scala 2.12 / kafka.api.TransactionsExpirationTest.testTransactionAfterProducerIdExpires(String).quorum=kraft | 8.8 sec | 1 Build / JDK 17 and Scala 2.13 / org.apache.kafka.streams.integration.SmokeTestDriverIntegrationTest.[1] true ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #12964: MINOR: Introduce MetadataProvenance and ImageReWriter
cmccabe commented on code in PR #12964: URL: https://github.com/apache/kafka/pull/12964#discussion_r1043672488 ## metadata/src/main/java/org/apache/kafka/image/MetadataProvenance.java: ## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image; + +import org.apache.kafka.raft.OffsetAndEpoch; + +import java.util.Objects; + + +/** + * Information about the source of a metadata image. + */ +public final class MetadataProvenance { +public static final MetadataProvenance EMPTY = new MetadataProvenance(-1L, 0, 0L); Review Comment: yes, that's fair. I suppose they should all be -1 because 0 implies that you have read and seen offset / epoch / timestamp 0. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #12964: MINOR: Introduce MetadataProvenance and ImageReWriter
cmccabe commented on code in PR #12964: URL: https://github.com/apache/kafka/pull/12964#discussion_r1043668840 ## metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java: ## @@ -152,19 +153,7 @@ public Optional metadataVersionChanged() { } } -public void read(long highestOffset, int highestEpoch, Iterator> reader) { -while (reader.hasNext()) { -List batch = reader.next(); -for (ApiMessageAndVersion messageAndVersion : batch) { -replay(highestOffset, highestEpoch, messageAndVersion.message()); -} -} -} - -public void replay(long offset, int epoch, ApiMessage record) { -highestOffset = offset; -highestEpoch = epoch; - +public void replay(ApiMessage record) { Review Comment: Offset and epoch are a bit silly in cases like when loading a snapshot, where all records have the same offset and epoch. This information is better tracked by the metadata loader and if we need it, we always have it there. We also have other information such as the source of the records and so on, in the loader. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya opened a new pull request, #12967: MINOR: Replace ArrayBuffer with ListBuffer for better performance
divijvaidya opened a new pull request, #12967: URL: https://github.com/apache/kafka/pull/12967 `ListBuffer` should be preferred over `ArrayBuffer` in cases where: 1. No indexing is required. 2. The data structure is converted to Java's List from scala. `ListBuffer` has a [guaranteed constant time performance](https://docs.scala-lang.org/overviews/collections-2.13/performance-characteristics.html) for append operations which makes it a lucrative data structure for cases when we create a list and append multiple entries to it. This PR modifies usage of `ArrayBuffer` to `ListBuffer` in latency sensitive code path. To prove the usefulness of this change, I am attaching a CPU profile for an [open messaging benchmark](https://openmessaging.cloud/docs/benchmarks/). Please note that the self-time of the `ReplicaManager.fetchMessages()` reduced after this changes (depicted by the empty space in top of the frame for `ReplicaManager.fetchMessages()`. **Before this PR** ![Screenshot 2022-12-08 at 15 30 24](https://user-images.githubusercontent.com/71267/206517187-6ebbcad5-00b5-4706-a2b8-5c64dce71d9a.png) **After this PR** ![Screenshot 2022-12-08 at 15 30 37](https://user-images.githubusercontent.com/71267/206517243-97938193-449d-4e46-9c32-fb9bc82c0469.png) Note: This PR piggybacks some general scala cleanup regarding usage of Optionals. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14340) KIP-880: X509 SAN based SPIFFE URI ACL within mTLS Client Certificates
[ https://issues.apache.org/jira/browse/KAFKA-14340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17644883#comment-17644883 ] Bart Van Bos commented on KAFKA-14340: -- Duplicate of https://issues.apache.org/jira/browse/KAFKA-12807 > KIP-880: X509 SAN based SPIFFE URI ACL within mTLS Client Certificates > --- > > Key: KAFKA-14340 > URL: https://issues.apache.org/jira/browse/KAFKA-14340 > Project: Kafka > Issue Type: Wish > Components: security >Affects Versions: 3.3.1 >Reporter: Bart Van Bos >Priority: Minor > > Istio and other *SPIFFE* based systems use X509 Client Certificates to > provide workload ID. Kafka currently does support Client Cert based AuthN/Z > and mapping to ACL, but only so be inspecting the CN field within a Client > Certificate. > There are several POC implementations out there implementing a bespoke > _KafkaPrincipalBuilder_ implementation for this purpose. Two examples include > * [https://github.com/traiana/kafka-spiffe-principal] > * [https://github.com/boeboe/kafka-istio-principal-builder] (written by > myself) > The gist is to introspect X509 based client certificates, look for a URI > based SPIFFE entry in the SAN extension and return that as a principle, that > can be used to write ACL rules. > This KIP request is to include this functionality into Kafka's main > functionality so end-users don't need to load custom and non-vetted java > classes/implementations. > The main use case for me is having a lot of Istio customers that express the > will to be able to leverage SPIFFE based IDs for their Kafka ACL > Authorization. This eliminates the need for sidecars on the broker side or > custom _EnvoyFilters_ and other less optimal implementations to integrate > Kafka into an Istio secured Kubernetes environment. > I believe this would make for a better integration between the Istio/SPIFFE > and Kafka ecosystems. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14352) Support rack-aware partition assignment for Kafka consumers
[ https://issues.apache.org/jira/browse/KAFKA-14352?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-14352. Reviewer: David Jacot Resolution: Fixed This includes protocol changes for rack-aware assignment. Default assignors will be made rack-aware under follow-on tickets in the next release. > Support rack-aware partition assignment for Kafka consumers > --- > > Key: KAFKA-14352 > URL: https://issues.apache.org/jira/browse/KAFKA-14352 > Project: Kafka > Issue Type: Sub-task > Components: consumer >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 3.4.0 > > > KIP-392 added support for consumers to fetch from the replica in their local > rack. To benefit from locality, consumers need to be assigned partitions > which have a replica in the same rack. This works well when replication > factor is the same as the number of racks, since every rack would then have a > replica with rack-aware replica assignment. If the number of racks is higher, > some racks may not have replicas of some partitions and hence consumers in > these racks will have to fetch from another rack. It will be useful to > propagate rack in the subscription metadata for consumers and provide a > rack-aware partition assignor for consumers. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] cadonna commented on a diff in pull request #12959: MINOR: Fix various memory leaks in tests
cadonna commented on code in PR #12959: URL: https://github.com/apache/kafka/pull/12959#discussion_r1043244790 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplierTest.java: ## @@ -130,6 +134,8 @@ public void shouldDeleteKeyAndPropagateV0() { .withValue(new Change<>(newValue, oldValue)), forwarded.get(0).record() ); + +stateStore.close(); Review Comment: Why do you close the state store in the test here but not in the other tests? ## streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java: ## @@ -148,6 +148,11 @@ private void verifyDBOptionsMethodCall(final Method method) throws Exception { assertThat(undeclaredMockMethodCall.getCause(), instanceOf(AssertionError.class)); assertThat(undeclaredMockMethodCall.getCause().getMessage().trim(), matchesPattern("Unexpected method call DBOptions\\." + method.getName() + "((.*\n*)*):")); +} finally { +reset(mockedDbOptions); +mockedDbOptions.close(); +replay(mockedDbOptions); +optionsFacadeDbOptions.close(); Review Comment: I think it would be enough to just `optionsFacadeDbOptions.close();`. You already verify that `optionsFacadeDbOptions.close();` calls `mockedDbOptions.close();` in the try-block. No need to verify it again. Alternatively, you could add `close` to the list of ignored methods (`ignoreMethods`) and verify as you did. However, you need to add `verify(mockedDbOptions)` after `optionsFacadeDbOptions.close()` otherwise nothing is verified. ## streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java: ## @@ -333,23 +343,23 @@ public void shouldLogWarningWhenSettingWalOptions() throws Exception { try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.class)) { -final RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter adapter -= new RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter(new DBOptions(), new ColumnFamilyOptions()); - -for (final Method method : RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.class.getDeclaredMethods()) { -if (walRelatedMethods.contains(method.getName())) { -method.invoke(adapter, getDBOptionsParameters(method.getParameterTypes())); +try (RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter adapter = + new RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter(new DBOptions(), new ColumnFamilyOptions())) { +for (final Method method : RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.class.getDeclaredMethods()) { +if (walRelatedMethods.contains(method.getName())) { +method.invoke(adapter, getDBOptionsParameters(method.getParameterTypes())); +} } -} -final List walOptions = Arrays.asList("walDir", "walFilter", "walRecoveryMode", "walBytesPerSync", "walSizeLimitMB", "manualWalFlush", "maxTotalWalSize", "walTtlSeconds"); +final List walOptions = Arrays.asList("walDir", "walFilter", "walRecoveryMode", "walBytesPerSync", "walSizeLimitMB", "manualWalFlush", "maxTotalWalSize", "walTtlSeconds"); -final Set logMessages = appender.getEvents().stream() -.filter(e -> e.getLevel().equals("WARN")) -.map(LogCaptureAppender.Event::getMessage) -.collect(Collectors.toSet()); +final Set logMessages = appender.getEvents().stream() +.filter(e -> e.getLevel().equals("WARN")) +.map(LogCaptureAppender.Event::getMessage) +.collect(Collectors.toSet()); -walOptions.forEach(option -> assertThat(logMessages, hasItem(String.format("WAL is explicitly disabled by Streams in RocksDB. Setting option '%s' will be ignored", option; +walOptions.forEach(option -> assertThat(logMessages, hasItem(String.format("WAL is explicitly disabled by Streams in RocksDB. Setting option '%s' will be ignored", option; +} Review Comment: ```suggestion ``` ## streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java: ## @@ -480,6 +480,7 @@ public void shouldCreateWriteBatches() { assertEquals(2, writeBatchMap.size()); for (final WriteBatch batch : writeBatchMap.values()) { assertEquals(1, batch.count()); +batch.close(); Review Comment:
[GitHub] [kafka] mumrah commented on a diff in pull request #12965: KAFKA-14448 Let ZK brokers register with KRaft controller
mumrah commented on code in PR #12965: URL: https://github.com/apache/kafka/pull/12965#discussion_r1043473436 ## core/src/main/scala/kafka/server/KafkaServer.scala: ## @@ -81,6 +84,19 @@ object KafkaServer { clientConfig } + def zkClient(name: String, time: Time, config: KafkaConfig, zkClientConfig: ZKClientConfig): KafkaZkClient = { Review Comment: This is coming from #12946 which is getting merged first -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a diff in pull request #12964: MINOR: Introduce MetadataProvenance and ImageReWriter
mumrah commented on code in PR #12964: URL: https://github.com/apache/kafka/pull/12964#discussion_r1043452505 ## metadata/src/main/java/org/apache/kafka/image/MetadataProvenance.java: ## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image; + +import org.apache.kafka.raft.OffsetAndEpoch; + +import java.util.Objects; + + +/** + * Information about the source of a metadata image. + */ +public final class MetadataProvenance { +public static final MetadataProvenance EMPTY = new MetadataProvenance(-1L, 0, 0L); + +private final long offset; +private final int epoch; +private final long lastContainedLogTimeMs; + +public MetadataProvenance( +long offset, +int epoch, +long lastContainedLogTimeMs +) { +this.offset = offset; +this.epoch = epoch; +this.lastContainedLogTimeMs = lastContainedLogTimeMs; +} + +public OffsetAndEpoch offsetAndEpoch() { +return new OffsetAndEpoch(offset, epoch); +} + +public long offset() { +return offset; +} + +public int epoch() { +return epoch; +} + +public long lastContainedLogTimeMs() { +return lastContainedLogTimeMs; +} + +public String snapshotName() { Review Comment: Is this used anywhere? Is it meant to match the name of the snapshot file? ## metadata/src/main/java/org/apache/kafka/image/MetadataVersionChange.java: ## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image; + +import org.apache.kafka.server.common.MetadataVersion; + +import java.util.Objects; + + +/** + * A change in the MetadataVersion. + */ +public final class MetadataVersionChange { Review Comment: Would it be useful to have methods like "isUpgrade" or "isDowngrade" here? ## metadata/src/main/java/org/apache/kafka/image/MetadataProvenance.java: ## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image; + +import org.apache.kafka.raft.OffsetAndEpoch; + +import java.util.Objects; + + +/** + * Information about the source of a metadata image. + */ +public final class MetadataProvenance { +public static final MetadataProvenance EMPTY = new MetadataProvenance(-1L, 0, 0L); Review Comment: We should probably make these sentinels the same as the initial values used in BrokerMetadataListener. That way, if someone read `provenance()` off the listener before we processed anything, it would equal EMPTY ## metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java: ## @@ -152,19 +153,7 @@ public
[jira] [Updated] (KAFKA-14456) Fix AdminUtils startIndex for rack aware partition creations
[ https://issues.apache.org/jira/browse/KAFKA-14456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Grant updated KAFKA-14456: - Description: When new partitions are added/created we calculate a start index based off all the brokers here [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/zk/AdminZkClient.scala#L270.] That start index is passed through to AdminUtils and is used to find a starting position in the list of brokers for making assignments. However, when we make rack aware assignments we use that index into a rack alternating list here [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/AdminUtils.scala#L160.] The meaning of the index gets lost: the index into the full list of brokers doesnt seem to have the same meaning as the index into a rack alternating list. I discovered this when I published [https://github.com/apache/kafka/pull/12943/files.] In that PR I added a test testRackAwarePartitionAssignment which does not work for ZK mode. was: When new partitions are added/created we calculate a start index based off all the brokers here [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/zk/AdminZkClient.scala#L270.] That start index is passed through to AdminUtils and is used to find a starting position in the list of brokers for making assignments. However, when we make rack aware assignments we use that index into a rack alternating list here [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/AdminUtils.scala#L160.] The meaning of the index gets lost: the index into the full list of brokers doesnt seem to have the same meaning as the index into a rack alternating list. I discovered this when I published [https://github.com/apache/kafka/pull/12943/files.] In that PR I added a test testRackAwarePartitionAssignment which does not work for ZK mode. > Fix AdminUtils startIndex for rack aware partition creations > - > > Key: KAFKA-14456 > URL: https://issues.apache.org/jira/browse/KAFKA-14456 > Project: Kafka > Issue Type: Improvement >Reporter: Andrew Grant >Priority: Major > > When new partitions are added/created we calculate a start index based off > all the brokers here > [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/zk/AdminZkClient.scala#L270.] > That start index is passed through to AdminUtils and is used to find a > starting position in the list of brokers for making assignments. However, > when we make rack aware assignments we use that index into a rack alternating > list here > [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/AdminUtils.scala#L160.] > The meaning of the index gets lost: the index into the full list of brokers > doesnt seem to have the same meaning as the index into a rack alternating > list. > I discovered this when I published > [https://github.com/apache/kafka/pull/12943/files.] In that PR I added a test > testRackAwarePartitionAssignment which does not work for ZK mode. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14456) Fix AdminUtils startIndex for rack aware partition creations
Andrew Grant created KAFKA-14456: Summary: Fix AdminUtils startIndex for rack aware partition creations Key: KAFKA-14456 URL: https://issues.apache.org/jira/browse/KAFKA-14456 Project: Kafka Issue Type: Improvement Reporter: Andrew Grant When new partitions are added/created we calculate a start index based off all the brokers here [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/zk/AdminZkClient.scala#L270.] That start index is passed through to AdminUtils and is used to find a starting position in the list of brokers for making assignments. However, when we make rack aware assignments we use that index into a rack alternating list here [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/AdminUtils.scala#L160.] The meaning of the index gets lost: the index into the full list of brokers doesnt seem to have the same meaning as the index into a rack alternating list. I discovered this when I published [https://github.com/apache/kafka/pull/12943/files.] In that PR I added a test testRackAwarePartitionAssignment which does not work for ZK mode. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] lucasbru commented on a diff in pull request #12935: KAFKA-14432: RocksDBStore relies on finalizers to not leak memory
lucasbru commented on code in PR #12935: URL: https://github.com/apache/kafka/pull/12935#discussion_r1043442711 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java: ## @@ -526,6 +521,9 @@ public synchronized void close() { fOptions.close(); filter.close(); cache.close(); +if (statistics != null) { +statistics.close(); +} Review Comment: I think you documented it already in the javadoc for `close` https://github.com/apache/kafka/pull/6697 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on a diff in pull request #12962: KAFKA-14318: KIP-878, Introduce partition autoscaling configs
bbejeck commented on code in PR #12962: URL: https://github.com/apache/kafka/pull/12962#discussion_r1043422635 ## streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java: ## @@ -1368,6 +1370,20 @@ public void shouldNotEnableAnyOptimizationsWithNoOptimizationConfig() { assertEquals(0, configs.size()); } +@Test +public void shouldEnablePartitionAutoscaling() { +props.put("partition.autoscaling.enabled", true); +final StreamsConfig config = new StreamsConfig(props); +assertTrue(config.getBoolean(PARTITION_AUTOSCALING_ENABLED_CONFIG)); +} + +@Test +public void shouldSetPartitionAutoscalingTimeout() { +props.put("partition.autoscaling.timeout.ms", 0L); +final StreamsConfig config = new StreamsConfig(props); +assertThat(config.getBoolean(PARTITION_AUTOSCALING_TIMEOUT_MS_CONFIG), is(0L)); Review Comment: oops! `config.getLong` here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] justinrlee commented on pull request #12797: MINOR: Remove requirement to specify --bootstrap-server in utility scripts if specified in property file
justinrlee commented on PR #12797: URL: https://github.com/apache/kafka/pull/12797#issuecomment-1342751002 Okay, I can remove the argument section. If I do that, you think we can get away without a KIP? What are your thoughts on my error state and the the way the CLI reacts without bootstrap.server? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #12935: KAFKA-14432: RocksDBStore relies on finalizers to not leak memory
cadonna commented on PR #12935: URL: https://github.com/apache/kafka/pull/12935#issuecomment-1342749417 I agree! I will backport this commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12635) Mirrormaker 2 offset sync is incorrect if the target partition is empty
[ https://issues.apache.org/jira/browse/KAFKA-12635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17644810#comment-17644810 ] Mickael Maison commented on KAFKA-12635: The fix only ensured new offsets will be correct. If you have existing negative offsets, you either have to wait till a new offset is emitted or you can manually update them using the Admin client or the kafka-consumer-groups tool for example. > Mirrormaker 2 offset sync is incorrect if the target partition is empty > --- > > Key: KAFKA-12635 > URL: https://issues.apache.org/jira/browse/KAFKA-12635 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.7.0 >Reporter: Frank Yi >Assignee: Mickael Maison >Priority: Major > Fix For: 3.3.0 > > Attachments: image-2022-11-02-11-53-33-329.png, > image-2022-11-02-11-56-34-994.png, screenshot-1.png > > > This bug occurs when using Mirrormaker with "sync.group.offsets.enabled = > true". > If a source partition is empty, but the source consumer group's offset for > that partition is non-zero, then Mirrormaker sets the target consumer group's > offset for that partition to the literal, not translated, offset of the > source consumer group. This state can be reached if the source consumer group > consumed some records that were now deleted (like by a retention policy), or > if Mirrormaker replication is set to start at "latest". This bug causes the > target consumer group's lag for that partition to be negative and breaks > offset sync for that partition until lag is positive. > The correct behavior when the source partition is empty would be to set the > target offset to the translated offset, not literal offset, which in this > case would always be 0. > Original email thread on this issue: > https://lists.apache.org/thread.html/r7c54ee5f57227367b911d4abffa72781772d8dd3b72d75eb65ee19f7%40%3Cusers.kafka.apache.org%3E -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14455) Kafka Connect create and update REST APIs should surface failures while writing to the config topic
Yash Mayya created KAFKA-14455: -- Summary: Kafka Connect create and update REST APIs should surface failures while writing to the config topic Key: KAFKA-14455 URL: https://issues.apache.org/jira/browse/KAFKA-14455 Project: Kafka Issue Type: Bug Components: KafkaConnect Reporter: Yash Mayya Assignee: Yash Mayya Kafka Connect's `POST /connectors` and `PUT /connectors/\{connector}/config` REST APIs internally simply write a message to the Connect cluster's internal config topic (which is then processed asynchronously by the herder). However, no callback is passed to the producer's send method and there is no error handling in place for producer send failures (see [here|https://github.com/apache/kafka/blob/c1a54671e8fc6c7daec5f5ec3d8c934be96b4989/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L716] / [here|https://github.com/apache/kafka/blob/c1a54671e8fc6c7daec5f5ec3d8c934be96b4989/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L726]). Consider one such case where the Connect worker's principal doesn't have a WRITE ACL on the cluster's config topic. Now suppose the user submits a connector's configs via one of the above two APIs. The producer send [here|https://github.com/apache/kafka/blob/c1a54671e8fc6c7daec5f5ec3d8c934be96b4989/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L716] / [here|https://github.com/apache/kafka/blob/c1a54671e8fc6c7daec5f5ec3d8c934be96b4989/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L726] won't succeed (due to a TopicAuthorizationException) but the API responses will be `201 Created` success responses anyway. This is a very poor UX because the connector will actually never be created but the API response indicated success. Furthermore, this failure would only be detectable if TRACE logs are enabled (via [this log)|https://github.com/apache/kafka/blob/df29b17fc40f7c15460988d58bc652c3d66b60f8/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java] making it near impossible for users to debug. Producer callbacks should be used to surface write failures back to the user via the API response. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] satishd commented on a diff in pull request #11390: [KAFKA-13369] Follower fetch protocol changes for tiered storage.
satishd commented on code in PR #11390: URL: https://github.com/apache/kafka/pull/11390#discussion_r1043330520 ## core/src/main/scala/kafka/log/remote/RemoteLogManager.scala: ## @@ -0,0 +1,290 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote + +import kafka.cluster.Partition +import kafka.metrics.KafkaMetricsGroup +import kafka.server.KafkaConfig +import kafka.server.epoch.LeaderEpochFileCache +import kafka.utils.Logging +import org.apache.kafka.common._ +import org.apache.kafka.common.record.FileRecords.TimestampAndOffset +import org.apache.kafka.common.record.{RecordBatch, RemoteLogInputStream} +import org.apache.kafka.common.utils.{ChildFirstClassLoader, Utils} +import org.apache.kafka.server.log.remote.metadata.storage.ClassLoaderAwareRemoteLogMetadataManager +import org.apache.kafka.server.log.remote.storage.{RemoteLogManagerConfig, RemoteLogMetadataManager, RemoteLogSegmentMetadata, RemoteStorageManager} + +import java.io.{Closeable, InputStream} +import java.security.{AccessController, PrivilegedAction} +import java.util +import java.util.Optional +import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap} +import scala.collection.Set +import scala.jdk.CollectionConverters._ + +/** + * This class is responsible for + * - initializing `RemoteStorageManager` and `RemoteLogMetadataManager` instances. + * - receives any leader and follower replica events and partition stop events and act on them + * - also provides APIs to fetch indexes, metadata about remote log segments. + * + * @param rlmConfig Configuration required for remote logging subsystem(tiered storage) at the broker level. + * @param brokerId id of the current broker. + * @param logDirdirectory of Kafka log segments. + */ +class RemoteLogManager(rlmConfig: RemoteLogManagerConfig, + brokerId: Int, + logDir: String) extends Logging with Closeable with KafkaMetricsGroup { + + // topic ids received on leadership changes + private val topicPartitionIds: ConcurrentMap[TopicPartition, Uuid] = new ConcurrentHashMap[TopicPartition, Uuid]() + + private val remoteLogStorageManager: RemoteStorageManager = createRemoteStorageManager() + private val remoteLogMetadataManager: RemoteLogMetadataManager = createRemoteLogMetadataManager() + + private val indexCache = new RemoteIndexCache(remoteStorageManager = remoteLogStorageManager, logDir = logDir) + + private var closed = false + + private[remote] def createRemoteStorageManager(): RemoteStorageManager = { +def createDelegate(classLoader: ClassLoader): RemoteStorageManager = { + classLoader.loadClass(rlmConfig.remoteStorageManagerClassName()) + .getDeclaredConstructor().newInstance().asInstanceOf[RemoteStorageManager] +} + +AccessController.doPrivileged(new PrivilegedAction[RemoteStorageManager] { + private val classPath = rlmConfig.remoteStorageManagerClassPath() + + override def run(): RemoteStorageManager = { + if (classPath != null && classPath.trim.nonEmpty) { +val classLoader = new ChildFirstClassLoader(classPath, this.getClass.getClassLoader) +val delegate = createDelegate(classLoader) +new ClassLoaderAwareRemoteStorageManager(delegate, classLoader) + } else { +createDelegate(this.getClass.getClassLoader) + } + } +}) + } + + private def configureRSM(): Unit = { +val rsmProps = new util.HashMap[String, Any]() +rlmConfig.remoteStorageManagerProps().asScala.foreach { case (k, v) => rsmProps.put(k, v) } +rsmProps.put(KafkaConfig.BrokerIdProp, brokerId) +remoteLogStorageManager.configure(rsmProps) + } + + private[remote] def createRemoteLogMetadataManager(): RemoteLogMetadataManager = { +def createDelegate(classLoader: ClassLoader) = { + classLoader.loadClass(rlmConfig.remoteLogMetadataManagerClassName()) +.getDeclaredConstructor() +.newInstance() +.asInstanceOf[RemoteLogMetadataManager] +} + +AccessController.doPrivileged(new PrivilegedAction[RemoteLogMetadataManager] { + private val classPath = rlmConfig.remoteLogMetadataManagerClassPath + +
[GitHub] [kafka] lucasbru commented on pull request #12935: KAFKA-14432: RocksDBStore relies on finalizers to not leak memory
lucasbru commented on PR #12935: URL: https://github.com/apache/kafka/pull/12935#issuecomment-1342589776 > If there are any other/older branches you think this should go to, I'll leave it to @cadonna to port the fix as I suspect there could be nontrivial merge conflicts Yes, hmm we probably have to port this back to all branches that got the `RocksDB` upgrade, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #12935: KAFKA-14432: RocksDBStore relies on finalizers to not leak memory
cadonna commented on code in PR #12935: URL: https://github.com/apache/kafka/pull/12935#discussion_r1043248049 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java: ## @@ -229,40 +230,34 @@ void openDB(final Map configs, final File stateDir) { // Setup statistics before the database is opened, otherwise the statistics are not updated // with the measurements from Rocks DB -maybeSetUpStatistics(configs); - +statistics = userSpecifiedOptions.statistics(); +if (statistics == null) { +if (RecordingLevel.forName((String) configs.get(METRICS_RECORDING_LEVEL_CONFIG)) == RecordingLevel.DEBUG) { +statistics = new Statistics(); +dbOptions.setStatistics(statistics); +} +userSpecifiedStatistics = false; +} else { +userSpecifiedStatistics = true; +} Review Comment: No comment! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon closed pull request #12631: [WIP] MINOR: building
showuon closed pull request #12631: [WIP] MINOR: building URL: https://github.com/apache/kafka/pull/12631 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac closed pull request #12897: KAFKA-14379: consumer should refresh preferred read replica on update metadata
dajac closed pull request #12897: KAFKA-14379: consumer should refresh preferred read replica on update metadata URL: https://github.com/apache/kafka/pull/12897 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org