[GitHub] [kafka] chia7712 commented on a change in pull request #10084: MINOR: Rename DecommissionBrokers to UnregisterBrokers
chia7712 commented on a change in pull request #10084: URL: https://github.com/apache/kafka/pull/10084#discussion_r572633977 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -217,6 +217,7 @@ class KafkaApis(val requestChannel: RequestChannel, case ApiKeys.ENVELOPE => handleEnvelope(request) case ApiKeys.DESCRIBE_CLUSTER => handleDescribeCluster(request) case ApiKeys.DESCRIBE_PRODUCERS => handleDescribeProducersRequest(request) +case ApiKeys.UNREGISTER_BROKER => maybeForwardToController(request, handleUnregisterBrokerRequest) Review comment: @cmccabe Thanks for explanation. It seems the [comment](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L221) gets invalid now. ``` Handle requests that should have been sent to the KIP-500 controlle ``` Could you revise the comment as well? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12315) Clearing the ZkReplicaStateMachine request batch state upon ControllerMovedException
[ https://issues.apache.org/jira/browse/KAFKA-12315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Wang updated KAFKA-12315: --- Description: As shown in the attached sequence diagram, during topic deletion the following sequence of events can happen 1. The ZkReplicaStateMachine calls AbstractControllerBrokerRequestBatch.addStopReplicaRequestForBrokers and adds some entries to its stopReplicaRequestMap 2. The ZkReplicaStateMachine then tries to call KafkaZkClient.updateLeaderAndIsr 3. Inside KafkaZkClient$.unwrapResponseWithControllerEpochCheck, a ControllerMovedException may be thrown due to zkVersion check failure 4. The ControllerMovedException is captured by the ZkPartitionStateMachine and an error such as the following is created: {code:java} 2021/02/05 04:34:45.302 [ZkReplicaStateMachine] [ReplicaStateMachine controllerId=31862] Controller moved to another broker when moving some replicas to OfflineReplica state org.apache.kafka.common.errors.ControllerMovedException: Controller epoch zkVersion check fails. Expected zkVersion = 139{code} 5. The ControllerMovedException is rethrown and captured by the KafkaController, which will resign At this point, the controller has resigned, however the stopReplicaRequestMap state populated in step 1 hasn't been cleared. Later on, when the controller wins an election and becomes the active controller again, an IllegalStateException will be triggered due to the left over state: {code:java} 2021/02/05 16:04:33.193 [ZkReplicaStateMachine] [ReplicaStateMachine controllerId=31862] Error while moving some replicas to OnlineReplica state java.lang.IllegalStateException: Controller to broker state change requests batch is not empty while creating a new one. Some StopReplica state changes Map(6121 -> ListB\ uffer(StopRepl\ icaRequestInfo([Topic=,Partition=2,Replica=6121],false))) might be lost at kafka.controller.AbstractControllerBrokerRequestBatch.newBatch(ControllerChannelManager.scala:383) ~[kafka_2.12-2.4.1.10.jar:?] at kafka.controller.ZkReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:109) ~[kafka_2.12-2.4.1.10.jar:?] at kafka.controller.ReplicaStateMachine.startup(ReplicaStateMachine.scala:40) ~[kafka_2.12-2.4.1.10.jar:?] at kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:365) ~[kafka_2.12-2.4.1.10.jar:?] at kafka.controller.KafkaController.elect(KafkaController.scala:1484) ~[kafka_2.12-2.4.1.10.jar:?] at kafka.controller.KafkaController.processReelect(KafkaController.scala:1972) ~[kafka_2.12-2.4.1.10.jar:?] at kafka.controller.KafkaController.process(KafkaController.scala:2065) ~[kafka_2.12-2.4.1.10.jar:?] at kafka.controller.QueuedEvent.process(ControllerEventManager.scala:53) ~[kafka_2.12-2.4.1.10.jar:?] at kafka.controller.ControllerEventManager$ControllerEventThread.$anonfun$doWork$1(ControllerEventManager.scala:137) ~[kafka_2.12-2.4.1.10.jar:?] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) [scala-library-2.12.10.jar:?] at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31) [kafka_2.12-2.4.1.10.jar:?] at kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:137) [kafka_2.12-2.4.1.10.jar:?] at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) [kafka_2.12-2.4.1.10.jar:?]{code} Essentially, the controller is not able to transition some replicas to OnlineReplica state, and it cannot send any requests to any brokers via the ReplicaStateMachine. was: As shown in the attached sequence diagram, during topic deletion the following sequence of events can happen 1. The ZkReplicaStateMachine calls AbstractControllerBrokerRequestBatch.addStopReplicaRequestForBrokers and adds some entries to its stopReplicaRequestMap 2. The ZkReplicaStateMachine then tries to call KafkaZkClient.updateLeaderAndIsr 3. Inside KafkaZkClient$.unwrapResponseWithControllerEpochCheck, a ControllerMovedException may be thrown due to zkVersion check failure 4. The ControllerMovedException is captured by the ZkPartitionStateMachine and an error such as the following is created: 2021/02/05 04:34:45.302 [ZkReplicaStateMachine] [ReplicaStateMachine controllerId=31862] Controller moved to another broker when moving some replicas to OfflineReplica state org.apache.kafka.common.errors.ControllerMovedException: Controller epoch zkVersion check fails. Expected zkVersion = 139 5. The ControllerMovedException is rethrown and captured by the KafkaController, which will resign At this point, the controller has resigned, however the stopReplicaRequestMap state populated in step 1 hasn't been cleared. Later on, when the controller wins an election and becomes the active controller again, an IllegalStateException will be triggered due to the left over state: {code:java} 2021/02/05 16:04:33.193 [ZkReplicaStateMachine]
[GitHub] [kafka] cmccabe commented on a change in pull request #10084: MINOR: Rename DecommissionBrokers to UnregisterBrokers
cmccabe commented on a change in pull request #10084: URL: https://github.com/apache/kafka/pull/10084#discussion_r572627716 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -217,6 +217,7 @@ class KafkaApis(val requestChannel: RequestChannel, case ApiKeys.ENVELOPE => handleEnvelope(request) case ApiKeys.DESCRIBE_CLUSTER => handleDescribeCluster(request) case ApiKeys.DESCRIBE_PRODUCERS => handleDescribeProducersRequest(request) +case ApiKeys.UNREGISTER_BROKER => maybeForwardToController(request, handleUnregisterBrokerRequest) Review comment: This call needs to be forwarded to the KIP-500 controller. That is different from the other KIP-500 RPCs which are controller RPCs, and are not expected to be received on the broker at all. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 opened a new pull request #10086: MINOR: expose number of forwarding requests to metrics
chia7712 opened a new pull request #10086: URL: https://github.com/apache/kafka/pull/10086 it was an exposed metrics value (#9580 removed it). The number of requests in queue is an important metrics item (for example: queue in `SocketServer`) so we should expose it. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12315) Clearing the ZkReplicaStateMachine request batch state upon ControllerMovedException
[ https://issues.apache.org/jira/browse/KAFKA-12315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Wang updated KAFKA-12315: --- Description: As shown in the attached sequence diagram, during topic deletion the following sequence of events can happen 1. The ZkReplicaStateMachine calls AbstractControllerBrokerRequestBatch.addStopReplicaRequestForBrokers and adds some entries to its stopReplicaRequestMap 2. The ZkReplicaStateMachine then tries to call KafkaZkClient.updateLeaderAndIsr 3. Inside KafkaZkClient$.unwrapResponseWithControllerEpochCheck, a ControllerMovedException may be thrown due to zkVersion check failure 4. The ControllerMovedException is captured by the ZkPartitionStateMachine and an error such as the following is created: 2021/02/05 04:34:45.302 [ZkReplicaStateMachine] [ReplicaStateMachine controllerId=31862] Controller moved to another broker when moving some replicas to OfflineReplica state org.apache.kafka.common.errors.ControllerMovedException: Controller epoch zkVersion check fails. Expected zkVersion = 139 5. The ControllerMovedException is rethrown and captured by the KafkaController, which will resign At this point, the controller has resigned, however the stopReplicaRequestMap state populated in step 1 hasn't been cleared. Later on, when the controller wins an election and becomes the active controller again, an IllegalStateException will be triggered due to the left over state: {code:java} 2021/02/05 16:04:33.193 [ZkReplicaStateMachine] [ReplicaStateMachine controllerId=31862] Error while moving some replicas to OnlineReplica state java.lang.IllegalStateException: Controller to broker state change requests batch is not empty while creating a new one. Some StopReplica state changes Map(6121 -> ListB\ uffer(StopRepl\ icaRequestInfo([Topic=,Partition=2,Replica=6121],false))) might be lost at kafka.controller.AbstractControllerBrokerRequestBatch.newBatch(ControllerChannelManager.scala:383) ~[kafka_2.12-2.4.1.10.jar:?] at kafka.controller.ZkReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:109) ~[kafka_2.12-2.4.1.10.jar:?] at kafka.controller.ReplicaStateMachine.startup(ReplicaStateMachine.scala:40) ~[kafka_2.12-2.4.1.10.jar:?] at kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:365) ~[kafka_2.12-2.4.1.10.jar:?] at kafka.controller.KafkaController.elect(KafkaController.scala:1484) ~[kafka_2.12-2.4.1.10.jar:?] at kafka.controller.KafkaController.processReelect(KafkaController.scala:1972) ~[kafka_2.12-2.4.1.10.jar:?] at kafka.controller.KafkaController.process(KafkaController.scala:2065) ~[kafka_2.12-2.4.1.10.jar:?] at kafka.controller.QueuedEvent.process(ControllerEventManager.scala:53) ~[kafka_2.12-2.4.1.10.jar:?] at kafka.controller.ControllerEventManager$ControllerEventThread.$anonfun$doWork$1(ControllerEventManager.scala:137) ~[kafka_2.12-2.4.1.10.jar:?] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) [scala-library-2.12.10.jar:?] at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31) [kafka_2.12-2.4.1.10.jar:?] at kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:137) [kafka_2.12-2.4.1.10.jar:?] at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) [kafka_2.12-2.4.1.10.jar:?]{code} Essentially, the controller is not able to transition some replicas to OnlineReplica state, and it cannot send any requests to any brokers via the ReplicaStateMachine. was: As shown in the attached sequence diagram, during topic deletion the following sequence of events can happen 1. The ZkReplicaStateMachine calls AbstractControllerBrokerRequestBatch.addStopReplicaRequestForBrokers and adds some entries to its stopReplicaRequestMap 2. The ZkReplicaStateMachine then tries to call KafkaZkClient.updateLeaderAndIsr 3. Inside KafkaZkClient$.unwrapResponseWithControllerEpochCheck, a ControllerMovedException may be thrown due to zkVersion check failure 4. The ControllerMovedException is captured by the ZkPartitionStateMachine and an error such as the following is created: 2021/02/05 04:34:45.302 [ZkReplicaStateMachine] [ReplicaStateMachine controllerId=31862] Controller moved to another broker when moving some replicas to OfflineReplica state org.apache.kafka.common.errors.ControllerMovedException: Controller epoch zkVersion check fails. Expected zkVersion = 139 5. The ControllerMovedException is rethrown and captured by the KafkaController, which will resign At this point, the controller has resigned, however the stopReplicaRequestMap state populated in step 1 hasn't been cleared. Later on, when the controller wins an election and becomes the active controller again, an IllegalStateException will be triggered due to the left over state: ``` 2021/02/05 16:04:33.193 [ZkReplicaStateMachine] [ReplicaStateMachine
[jira] [Created] (KAFKA-12315) Clearing the ZkReplicaStateMachine request batch state upon ControllerMovedException
Lucas Wang created KAFKA-12315: -- Summary: Clearing the ZkReplicaStateMachine request batch state upon ControllerMovedException Key: KAFKA-12315 URL: https://issues.apache.org/jira/browse/KAFKA-12315 Project: Kafka Issue Type: Improvement Reporter: Lucas Wang Attachments: controller_moved_left_over_state.png As shown in the attached sequence diagram, during topic deletion the following sequence of events can happen 1. The ZkReplicaStateMachine calls AbstractControllerBrokerRequestBatch.addStopReplicaRequestForBrokers and adds some entries to its stopReplicaRequestMap 2. The ZkReplicaStateMachine then tries to call KafkaZkClient.updateLeaderAndIsr 3. Inside KafkaZkClient$.unwrapResponseWithControllerEpochCheck, a ControllerMovedException may be thrown due to zkVersion check failure 4. The ControllerMovedException is captured by the ZkPartitionStateMachine and an error such as the following is created: 2021/02/05 04:34:45.302 [ZkReplicaStateMachine] [ReplicaStateMachine controllerId=31862] Controller moved to another broker when moving some replicas to OfflineReplica state org.apache.kafka.common.errors.ControllerMovedException: Controller epoch zkVersion check fails. Expected zkVersion = 139 5. The ControllerMovedException is rethrown and captured by the KafkaController, which will resign At this point, the controller has resigned, however the stopReplicaRequestMap state populated in step 1 hasn't been cleared. Later on, when the controller wins an election and becomes the active controller again, an IllegalStateException will be triggered due to the left over state: ``` 2021/02/05 16:04:33.193 [ZkReplicaStateMachine] [ReplicaStateMachine controllerId=31862] Error while moving some replicas to OnlineReplica state java.lang.IllegalStateException: Controller to broker state change requests batch is not empty while creating a new one. Some StopReplica state changes Map(6121 -> ListB\ uffer(StopRepl\ icaRequestInfo([Topic=,Partition=2,Replica=6121],false))) might be lost at kafka.controller.AbstractControllerBrokerRequestBatch.newBatch(ControllerChannelManager.scala:383) ~[kafka_2.12-2.4.1.10.jar:?] at kafka.controller.ZkReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:109) ~[kafka_2.12-2.4.1.10.jar:?] at kafka.controller.ReplicaStateMachine.startup(ReplicaStateMachine.scala:40) ~[kafka_2.12-2.4.1.10.jar:?] at kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:365) ~[kafka_2.12-2.4.1.10.jar:?] at kafka.controller.KafkaController.elect(KafkaController.scala:1484) ~[kafka_2.12-2.4.1.10.jar:?] at kafka.controller.KafkaController.processReelect(KafkaController.scala:1972) ~[kafka_2.12-2.4.1.10.jar:?] at kafka.controller.KafkaController.process(KafkaController.scala:2065) ~[kafka_2.12-2.4.1.10.jar:?] at kafka.controller.QueuedEvent.process(ControllerEventManager.scala:53) ~[kafka_2.12-2.4.1.10.jar:?] at kafka.controller.ControllerEventManager$ControllerEventThread.$anonfun$doWork$1(ControllerEventManager.scala:137) ~[kafka_2.12-2.4.1.10.jar:?] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) [scala-library-2.12.10.jar:?] at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31) [kafka_2.12-2.4.1.10.jar:?] at kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:137) [kafka_2.12-2.4.1.10.jar:?] at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) [kafka_2.12-2.4.1.10.jar:?] ``` Essentially, the controller is not able to transition some replicas to OnlineReplica state, and it cannot send any requests to any brokers via the ReplicaStateMachine. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10835) Replace Runnable and Callable overrides with lambdas in Connect
[ https://issues.apache.org/jira/browse/KAFKA-10835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-10835. Fix Version/s: 2.8.0 Resolution: Fixed > Replace Runnable and Callable overrides with lambdas in Connect > --- > > Key: KAFKA-10835 > URL: https://issues.apache.org/jira/browse/KAFKA-10835 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Konstantine Karantasis >Assignee: Lev Zemlyanov >Priority: Minor > Fix For: 2.8.0 > > > We've been using Java 8 for sometime now of course. Replacing the overrides > from the pre-Java 8 era will simplify some parts of the code and will reduce > verbosity. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10834) Remove redundant type casts in Connect
[ https://issues.apache.org/jira/browse/KAFKA-10834?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-10834. Fix Version/s: 2.8.0 Resolution: Fixed > Remove redundant type casts in Connect > -- > > Key: KAFKA-10834 > URL: https://issues.apache.org/jira/browse/KAFKA-10834 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Konstantine Karantasis >Assignee: Lev Zemlyanov >Priority: Minor > Fix For: 2.8.0 > > > Some type casts in the code base are not required any more and can be > removed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12314) Leverage custom comparator for optimized range scans on RocksDB
[ https://issues.apache.org/jira/browse/KAFKA-12314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17281526#comment-17281526 ] Sagar Rao commented on KAFKA-12314: --- hey [~ableegoldman], is this something I can take up? > Leverage custom comparator for optimized range scans on RocksDB > --- > > Key: KAFKA-12314 > URL: https://issues.apache.org/jira/browse/KAFKA-12314 > Project: Kafka > Issue Type: Improvement >Reporter: A. Sophie Blee-Goldman >Priority: Major > > Currently our SessionStore has poor performance on any range scans due to the > byte layout and possibility of varyingly sized keys. A session window > consists of the key and two timestamps, the windowEnd and windowStart. This > data is formatted as > [key, windowEnd, windowStart] > The default comparator in rocksdb is lexicographical, and so it compares > bytes starting with the key. This means with the above format, the records > are effectively sorted first by key and then by windowEnd. But if two keys > are of different lengths, the comparator will start on the left and end up > comparing the tail bytes of the longer key against the windowEnd timestamp of > the shorter key. Due to this, we have to set the bounds on SessionStore range > scans very conservatively, which means we end up reading way more data than > we need. > One way out of this would be to use a custom comparator which understands the > window bytes format we use. So far we haven't done this because of the > overhead in crossing the JNI with the Java Comparator; we would need a native > comparator to avoid further performance hit. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10866) Add fetched metadata to ConsumerRecords
[ https://issues.apache.org/jira/browse/KAFKA-10866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler resolved KAFKA-10866. -- Resolution: Fixed > Add fetched metadata to ConsumerRecords > --- > > Key: KAFKA-10866 > URL: https://issues.apache.org/jira/browse/KAFKA-10866 > Project: Kafka > Issue Type: Sub-task > Components: consumer >Reporter: John Roesler >Assignee: John Roesler >Priority: Blocker > Fix For: 2.8.0 > > > Consumer-side changes for KIP-695 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10866) Add fetched metadata to ConsumerRecords
[ https://issues.apache.org/jira/browse/KAFKA-10866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10866: - Priority: Major (was: Blocker) > Add fetched metadata to ConsumerRecords > --- > > Key: KAFKA-10866 > URL: https://issues.apache.org/jira/browse/KAFKA-10866 > Project: Kafka > Issue Type: Sub-task > Components: consumer >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > Fix For: 2.8.0 > > > Consumer-side changes for KIP-695 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10866) Add fetched metadata to ConsumerRecords
[ https://issues.apache.org/jira/browse/KAFKA-10866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10866: - Priority: Blocker (was: Major) > Add fetched metadata to ConsumerRecords > --- > > Key: KAFKA-10866 > URL: https://issues.apache.org/jira/browse/KAFKA-10866 > Project: Kafka > Issue Type: Sub-task > Components: consumer >Reporter: John Roesler >Assignee: John Roesler >Priority: Blocker > Fix For: 2.8.0 > > > Consumer-side changes for KIP-695 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Reopened] (KAFKA-10866) Add fetched metadata to ConsumerRecords
[ https://issues.apache.org/jira/browse/KAFKA-10866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler reopened KAFKA-10866: -- > Add fetched metadata to ConsumerRecords > --- > > Key: KAFKA-10866 > URL: https://issues.apache.org/jira/browse/KAFKA-10866 > Project: Kafka > Issue Type: Sub-task > Components: consumer >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > Fix For: 2.8.0 > > > Consumer-side changes for KIP-695 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12314) Leverage custom comparator for optimized range scans on RocksDB
[ https://issues.apache.org/jira/browse/KAFKA-12314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17281502#comment-17281502 ] A. Sophie Blee-Goldman commented on KAFKA-12314: rocksdb has recently merged some improvements to the performance of their Java API. Once we upgrade our rocksdb dependency, we should investigate whether it would be possible to implement a more efficient comparator and then tighten up the range scan bounds > Leverage custom comparator for optimized range scans on RocksDB > --- > > Key: KAFKA-12314 > URL: https://issues.apache.org/jira/browse/KAFKA-12314 > Project: Kafka > Issue Type: Improvement >Reporter: A. Sophie Blee-Goldman >Priority: Major > > Currently our SessionStore has poor performance on any range scans due to the > byte layout and possibility of varyingly sized keys. A session window > consists of the key and two timestamps, the windowEnd and windowStart. This > data is formatted as > [key, windowEnd, windowStart] > The default comparator in rocksdb is lexicographical, and so it compares > bytes starting with the key. This means with the above format, the records > are effectively sorted first by key and then by windowEnd. But if two keys > are of different lengths, the comparator will start on the left and end up > comparing the tail bytes of the longer key against the windowEnd timestamp of > the shorter key. Due to this, we have to set the bounds on SessionStore range > scans very conservatively, which means we end up reading way more data than > we need. > One way out of this would be to use a custom comparator which understands the > window bytes format we use. So far we haven't done this because of the > overhead in crossing the JNI with the Java Comparator; we would need a native > comparator to avoid further performance hit. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 merged pull request #10012: MINOR: Refactor return statement and log info
chia7712 merged pull request #10012: URL: https://github.com/apache/kafka/pull/10012 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12314) Leverage custom comparator for optimized range scans on RocksDB
A. Sophie Blee-Goldman created KAFKA-12314: -- Summary: Leverage custom comparator for optimized range scans on RocksDB Key: KAFKA-12314 URL: https://issues.apache.org/jira/browse/KAFKA-12314 Project: Kafka Issue Type: Improvement Reporter: A. Sophie Blee-Goldman Currently our SessionStore has poor performance on any range scans due to the byte layout and possibility of varyingly sized keys. A session window consists of the key and two timestamps, the windowEnd and windowStart. This data is formatted as [key, windowEnd, windowStart] The default comparator in rocksdb is lexicographical, and so it compares bytes starting with the key. This means with the above format, the records are effectively sorted first by key and then by windowEnd. But if two keys are of different lengths, the comparator will start on the left and end up comparing the tail bytes of the longer key against the windowEnd timestamp of the shorter key. Due to this, we have to set the bounds on SessionStore range scans very conservatively, which means we end up reading way more data than we need. One way out of this would be to use a custom comparator which understands the window bytes format we use. So far we haven't done this because of the overhead in crossing the JNI with the Java Comparator; we would need a native comparator to avoid further performance hit. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] tang7526 commented on a change in pull request #10012: MINOR: Refactor return statement and log info
tang7526 commented on a change in pull request #10012: URL: https://github.com/apache/kafka/pull/10012#discussion_r572543598 ## File path: clients/src/main/java/org/apache/kafka/common/utils/Utils.java ## @@ -1334,4 +1334,13 @@ public static long getDateTime(String timestamp) throws ParseException, IllegalA public static Iterator covariantCast(Iterator iterator) { return (Iterator) iterator; } + +/** + * Checks if a string is null, empty or whitespace only. + * @param str a string to be checked + * @return true if the string is null, empty or whitespace only; otherwise, return false. + */ +public static boolean isBlank(String str) { Review comment: > How about applying this helper method to code base? @chia7712 OK. I will do it on next PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #10083: MINOR: Remove check for comparision of 3rd IP of kafka.apache.org
chia7712 commented on a change in pull request #10083: URL: https://github.com/apache/kafka/pull/10083#discussion_r572542393 ## File path: clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java ## @@ -310,9 +310,6 @@ public void testMultipleIPsWithUseAll() throws UnknownHostException { connectionStates.connecting(nodeId1, time.milliseconds(), hostTwoIps, ClientDnsLookup.USE_ALL_DNS_IPS); InetAddress addr2 = connectionStates.currentAddress(nodeId1); Review comment: Relying on "specify" number of IPs is still not stable for this tests (for example, the number of resolved IPs becomes 1). It seems to me this test should check whether the number used addresses (after building many connections) is equal to number of resolved addresses. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12313) Consider deprecating the default.windowed.serde.inner.class configs
A. Sophie Blee-Goldman created KAFKA-12313: -- Summary: Consider deprecating the default.windowed.serde.inner.class configs Key: KAFKA-12313 URL: https://issues.apache.org/jira/browse/KAFKA-12313 Project: Kafka Issue Type: Improvement Reporter: A. Sophie Blee-Goldman During the discussion of KIP-659 we discussed whether it made sense to have a "default" class for the serdes of windowed inner classes across Streams. Using these configs instead of specifying an actual Serde object can lead to subtle bugs, since the WindowedDeserializer requires a windowSize in addition to the inner class. If the default constructor is invoked, as it will be when falling back on the config, this windowSize defaults to MAX_VALUE. If the downstream program doesn't care about the window end time in the output, then this can go unnoticed and technically there is no problem. But if anything does depend on the end time, or the user just wants to manually read the output for testing purposes, then the MAX_VALUE will result in a garbage timestamp. We should consider whether the convenience of specifying a config instead of instantiating a Serde in each operator is really worth the risk of a user accidentally failing to specify a windowSize -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 commented on a change in pull request #10084: MINOR: Rename DecommissionBrokers to UnregisterBrokers
chia7712 commented on a change in pull request #10084: URL: https://github.com/apache/kafka/pull/10084#discussion_r572538990 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -217,6 +217,7 @@ class KafkaApis(val requestChannel: RequestChannel, case ApiKeys.ENVELOPE => handleEnvelope(request) case ApiKeys.DESCRIBE_CLUSTER => handleDescribeCluster(request) case ApiKeys.DESCRIBE_PRODUCERS => handleDescribeProducersRequest(request) +case ApiKeys.UNREGISTER_BROKER => maybeForwardToController(request, handleUnregisterBrokerRequest) Review comment: The other APIs from KIP-500 is just closed. Maybe this API could follow same pattern? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-8575) Investigate removing EAGER protocol & cleaning up task suspension in Streams rebalancing
[ https://issues.apache.org/jira/browse/KAFKA-8575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-8575: -- Fix Version/s: (was: 3.0.0) 3.1.0 > Investigate removing EAGER protocol & cleaning up task suspension in Streams > rebalancing > - > > Key: KAFKA-8575 > URL: https://issues.apache.org/jira/browse/KAFKA-8575 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.4.0 >Reporter: A. Sophie Blee-Goldman >Priority: Major > Fix For: 3.1.0 > > > With KIP-429 the suspend/resume of tasks may have minimal gains while adding > a lot of complexity and potential bugs. We should consider removing/cleaning > it up. > We should also consider removing EAGER rebalancing from Streams entirely, if > results indicate that COOPERATIVE displays superior performance. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 commented on a change in pull request #10012: MINOR: Refactor return statement and log info
chia7712 commented on a change in pull request #10012: URL: https://github.com/apache/kafka/pull/10012#discussion_r572536164 ## File path: clients/src/main/java/org/apache/kafka/common/utils/Utils.java ## @@ -1334,4 +1334,13 @@ public static long getDateTime(String timestamp) throws ParseException, IllegalA public static Iterator covariantCast(Iterator iterator) { return (Iterator) iterator; } + +/** + * Checks if a string is null, empty or whitespace only. + * @param str a string to be checked + * @return true if the string is null, empty or whitespace only; otherwise, return false. + */ +public static boolean isBlank(String str) { Review comment: How about applying this helper method to code base? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12312) kafka_2.13:2.6.1 throws NoSuchMethodError when running against scala-sdk-2.13.4
[ https://issues.apache.org/jira/browse/KAFKA-12312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17281487#comment-17281487 ] Luke Chen commented on KAFKA-12312: --- [~tkornai], we applied the scala 2.13.4 and fixed the compatible issues on the latest trunk branch, which should be in 2.8 release. I'll help you ask if we want to cherry-pick this issue into 2.6.2 release. Thanks for reporting. > kafka_2.13:2.6.1 throws NoSuchMethodError when running against > scala-sdk-2.13.4 > --- > > Key: KAFKA-12312 > URL: https://issues.apache.org/jira/browse/KAFKA-12312 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.6.0, 2.6.1 >Reporter: Tamas Kornai >Priority: Minor > > The below snippet runs without issues with {{scala-sdk-2.13.3}}, but throws > {{NoSuchMethodError}} for {{scala-sdk-2.13.4}}: > {quote}val authorize = new AclAuthorizer() > val acls = authorize.acls(AclBindingFilter.ANY) > {quote} > The error is: > {quote}Exception in thread "main" java.lang.NoSuchMethodError: > 'scala.collection.immutable.RedBlackTree$Tree > scala.collection.immutable.TreeMap.scala$collection$immutable$TreeMap$$tree()' > at kafka.security.authorizer.AclAuthorizer.acls(AclAuthorizer.scala:293) > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] gitlw commented on pull request #7670: KAFKA-7016: Not hide the stack trace for ApiException
gitlw commented on pull request #7670: URL: https://github.com/apache/kafka/pull/7670#issuecomment-775603648 @guozhangwang I'm troubleshooting a broker side issue by following the exception ``` 2021/02/05 04:34:45.302 [ZkReplicaStateMachine] [ReplicaStateMachine controllerId=31862] Controller moved to another broker when moving some replic\as to OfflineReplica state org.apache.kafka.common.errors.ControllerMovedException: Controller epoch zkVersion check fails. Expected zkVersion = 139 ``` The troubleshooting is very painful without the stacktrace. When do you think we can get this PR merged? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 merged pull request #10079: MINOR: replace hard-coding utf-8 with StandardCharsets.UTF_8
chia7712 merged pull request #10079: URL: https://github.com/apache/kafka/pull/10079 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request
junrao commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r572512463 ## File path: core/src/main/scala/kafka/server/FetchSession.scala ## @@ -353,50 +508,68 @@ class SessionlessFetchContext(val fetchData: util.Map[TopicPartition, FetchReque * @param time The clock to use. * @param cache The fetch session cache. * @param reqMetadataThe request metadata. - * @param fetchData The partition data from the fetch request. + * @param fetchDataAndError The partition data and topic ID errors from the fetch request. + * @param topicIds The map from topic name to topic IDs * @param isFromFollower True if this fetch request came from a follower. */ class FullFetchContext(private val time: Time, private val cache: FetchSessionCache, private val reqMetadata: JFetchMetadata, - private val fetchData: util.Map[TopicPartition, FetchRequest.PartitionData], + private val fetchDataAndError: FetchDataAndError, + private val topicIds: util.Map[String, Uuid], private val isFromFollower: Boolean) extends FetchContext { + val idErrors = new util.LinkedList(fetchDataAndError.idErrors.values()) override def getFetchOffset(part: TopicPartition): Option[Long] = -Option(fetchData.get(part)).map(_.fetchOffset) +Option(fetchDataAndError.fetchData.get(part)).map(_.fetchOffset) override def foreachPartition(fun: (TopicPartition, FetchRequest.PartitionData) => Unit): Unit = { -fetchData.forEach(fun(_, _)) +fetchDataAndError.fetchData.forEach(fun(_, _)) Review comment: I was referring to the following code. It seems to need to iterate every partition through fetchContext so that the UNKNOWN_TOPIC_OR_PARTITION error code can be added for each partition. ``` if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) { fetchContext.foreachPartition { (topicPartition, data) => if (!metadataCache.contains(topicPartition)) erroneous += topicPartition -> errorResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #10041: MINOR: Prevent creating partition.metadata until ID can be written
junrao commented on a change in pull request #10041: URL: https://github.com/apache/kafka/pull/10041#discussion_r572510600 ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -256,7 +256,8 @@ class Log(@volatile private var _dir: File, val topicPartition: TopicPartition, val producerStateManager: ProducerStateManager, logDirFailureChannel: LogDirFailureChannel, - private val hadCleanShutdown: Boolean = true) extends Logging with KafkaMetricsGroup { + private val hadCleanShutdown: Boolean = true, + val keepPartitionMetadataFile: Boolean = true) extends Logging with KafkaMetricsGroup { Review comment: Yes, looks good to me. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #9986: JUnit extensions for integration tests
cmccabe commented on pull request #9986: URL: https://github.com/apache/kafka/pull/9986#issuecomment-775590227 looks like there is a checkstyle issue: ``` [2021-02-08T23:40:02.927Z] [Error] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-9986/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala:20: Unused import [2021-02-08T23:40:02.927Z] [Error] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-9986/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala:91: @nowarn annotation does not suppress any warnings ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #9986: JUnit extensions for integration tests
cmccabe commented on pull request #9986: URL: https://github.com/apache/kafka/pull/9986#issuecomment-775589690 LGTM. Thanks, @mumrah This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #9986: JUnit extensions for integration tests
cmccabe commented on a change in pull request #9986: URL: https://github.com/apache/kafka/pull/9986#discussion_r572505265 ## File path: core/src/test/java/kafka/test/ClusterConfig.java ## @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.test; + +import kafka.test.annotation.Type; +import org.apache.kafka.common.security.auth.SecurityProtocol; + +import java.io.File; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; + +/** + * Represents a requested configuration of a Kafka cluster for integration testing + */ +public class ClusterConfig { + +private final Type type; +private final int brokers; +private final int controllers; +private final String name; +private final boolean autoStart; + +private final String securityProtocol; +private final String listenerName; +private final File trustStoreFile; + +private final Properties serverProperties = new Properties(); +private final Properties producerProperties = new Properties(); +private final Properties consumerProperties = new Properties(); +private final Properties adminClientProperties = new Properties(); +private final Properties saslServerProperties = new Properties(); +private final Properties saslClientProperties = new Properties(); + +ClusterConfig(Type type, int brokers, int controllers, String name, boolean autoStart, + String securityProtocol, String listenerName, File trustStoreFile) { +this.type = type; +this.brokers = brokers; +this.controllers = controllers; +this.name = name; +this.autoStart = autoStart; +this.securityProtocol = securityProtocol; +this.listenerName = listenerName; +this.trustStoreFile = trustStoreFile; +} + +public Type clusterType() { +return type; +} + +public int brokers() { +return brokers; +} + +public int controllers() { +return controllers; Review comment: ok This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #10041: MINOR: Prevent creating partition.metadata until ID can be written
jolshan commented on a change in pull request #10041: URL: https://github.com/apache/kafka/pull/10041#discussion_r572482465 ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -256,7 +256,8 @@ class Log(@volatile private var _dir: File, val topicPartition: TopicPartition, val producerStateManager: ProducerStateManager, logDirFailureChannel: LogDirFailureChannel, - private val hadCleanShutdown: Boolean = true) extends Logging with KafkaMetricsGroup { + private val hadCleanShutdown: Boolean = true, + val keepPartitionMetadataFile: Boolean = true) extends Logging with KafkaMetricsGroup { Review comment: Something like this work for an explanation? ``` * boolean flag to indicate whether the partition.metadata file should be kept in the * log directory. A partition.metadata file is only created when the controller's * inter-broker protocol version is at least 2.8. This file will persist the topic ID on * the broker. If inter-broker protocol is downgraded below 2.8, a topic ID may be lost * and a new ID generated upon re-upgrade. If the inter-broker protocol version is below * 2.8, partition.metadata will be deleted to avoid ID conflicts upon re-upgrade. ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #10041: MINOR: Prevent creating partition.metadata until ID can be written
jolshan commented on a change in pull request #10041: URL: https://github.com/apache/kafka/pull/10041#discussion_r572482465 ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -256,7 +256,8 @@ class Log(@volatile private var _dir: File, val topicPartition: TopicPartition, val producerStateManager: ProducerStateManager, logDirFailureChannel: LogDirFailureChannel, - private val hadCleanShutdown: Boolean = true) extends Logging with KafkaMetricsGroup { + private val hadCleanShutdown: Boolean = true, + val keepPartitionMetadataFile: Boolean = true) extends Logging with KafkaMetricsGroup { Review comment: Something like this work for an explanation? ``` * boolean flag to indicate whether the partition metadata file should be kept in the * log directory. A partition.metadata file is only created when the controller's * inter-broker protocol version is at least 2.8. This file will persist the topic ID on * the broker. If inter-broker protocol is downgraded below 2.8, a topic ID may be lost * and a new ID generated upon re-upgrade. If the inter-broker protocol version is below * 2.8, partition.metadata will be deleted to avoid ID conflicts upon re-upgrade. ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #10041: MINOR: Prevent creating partition.metadata until ID can be written
jolshan commented on a change in pull request #10041: URL: https://github.com/apache/kafka/pull/10041#discussion_r572475420 ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -256,7 +256,8 @@ class Log(@volatile private var _dir: File, val topicPartition: TopicPartition, val producerStateManager: ProducerStateManager, logDirFailureChannel: LogDirFailureChannel, - private val hadCleanShutdown: Boolean = true) extends Logging with KafkaMetricsGroup { + private val hadCleanShutdown: Boolean = true, + val keepPartitionMetadataFile: Boolean = true) extends Logging with KafkaMetricsGroup { Review comment: good point. will do! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #10041: MINOR: Prevent creating partition.metadata until ID can be written
junrao commented on a change in pull request #10041: URL: https://github.com/apache/kafka/pull/10041#discussion_r572474952 ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -256,7 +256,8 @@ class Log(@volatile private var _dir: File, val topicPartition: TopicPartition, val producerStateManager: ProducerStateManager, logDirFailureChannel: LogDirFailureChannel, - private val hadCleanShutdown: Boolean = true) extends Logging with KafkaMetricsGroup { + private val hadCleanShutdown: Boolean = true, + val keepPartitionMetadataFile: Boolean = true) extends Logging with KafkaMetricsGroup { Review comment: Could we add the new param to the javadoc? In the javadoc, it would be useful to explain a bit how this helps with re-upgrade. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request
jolshan commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r572475014 ## File path: core/src/main/scala/kafka/server/FetchSession.scala ## @@ -353,50 +508,68 @@ class SessionlessFetchContext(val fetchData: util.Map[TopicPartition, FetchReque * @param time The clock to use. * @param cache The fetch session cache. * @param reqMetadataThe request metadata. - * @param fetchData The partition data from the fetch request. + * @param fetchDataAndError The partition data and topic ID errors from the fetch request. + * @param topicIds The map from topic name to topic IDs * @param isFromFollower True if this fetch request came from a follower. */ class FullFetchContext(private val time: Time, private val cache: FetchSessionCache, private val reqMetadata: JFetchMetadata, - private val fetchData: util.Map[TopicPartition, FetchRequest.PartitionData], + private val fetchDataAndError: FetchDataAndError, + private val topicIds: util.Map[String, Uuid], private val isFromFollower: Boolean) extends FetchContext { + val idErrors = new util.LinkedList(fetchDataAndError.idErrors.values()) override def getFetchOffset(part: TopicPartition): Option[Long] = -Option(fetchData.get(part)).map(_.fetchOffset) +Option(fetchDataAndError.fetchData.get(part)).map(_.fetchOffset) override def foreachPartition(fun: (TopicPartition, FetchRequest.PartitionData) => Unit): Unit = { -fetchData.forEach(fun(_, _)) +fetchDataAndError.fetchData.forEach(fun(_, _)) } override def getResponseSize(updates: FetchSession.RESP_MAP, versionId: Short): Int = { -FetchResponse.sizeOf(versionId, updates.entrySet.iterator) +FetchResponse.sizeOf(versionId, updates.entrySet.iterator, idErrors, topicIds) } override def updateAndGenerateResponseData(updates: FetchSession.RESP_MAP): FetchResponse[Records] = { def createNewSession: FetchSession.CACHE_MAP = { val cachedPartitions = new FetchSession.CACHE_MAP(updates.size) updates.forEach { (part, respData) => -val reqData = fetchData.get(part) -cachedPartitions.mustAdd(new CachedPartition(part, reqData, respData)) +val reqData = fetchDataAndError.fetchData.get(part) +cachedPartitions.mustAdd(new CachedPartition(part, topicIds.getOrDefault(part.topic(), Uuid.ZERO_UUID), reqData, respData)) Review comment: I think this is used for handling the case of older request versions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request
jolshan commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r572472724 ## File path: core/src/main/scala/kafka/server/FetchSession.scala ## @@ -237,14 +317,80 @@ class FetchSession(val id: Int, type TL = util.ArrayList[TopicPartition] // Update the cached partition data based on the request. - def update(fetchData: FetchSession.REQ_MAP, - toForget: util.List[TopicPartition], - reqMetadata: JFetchMetadata): (TL, TL, TL) = synchronized { + def update(version: Short, + fetchDataAndError: FetchDataAndError, + toForgetAndIds: ToForgetAndIds, + reqMetadata: JFetchMetadata, + topicIds: util.Map[String, Uuid], + topicNames: util.Map[Uuid, String]): (TL, TL, TL) = synchronized { val added = new TL val updated = new TL val removed = new TL -fetchData.forEach { (topicPart, reqData) => - val newCachedPart = new CachedPartition(topicPart, reqData) + +// Only make changes to topic IDs if we have a new request version. +// If we receive an old request version, ignore all topic ID code, keep IDs that are there. +if (version >= 13) { Review comment: I think we do since this is looking at the partitions cached in the session. I'll take another look though. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request
jolshan commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r572470829 ## File path: core/src/main/scala/kafka/server/FetchSession.scala ## @@ -353,50 +508,68 @@ class SessionlessFetchContext(val fetchData: util.Map[TopicPartition, FetchReque * @param time The clock to use. * @param cache The fetch session cache. * @param reqMetadataThe request metadata. - * @param fetchData The partition data from the fetch request. + * @param fetchDataAndError The partition data and topic ID errors from the fetch request. + * @param topicIds The map from topic name to topic IDs * @param isFromFollower True if this fetch request came from a follower. */ class FullFetchContext(private val time: Time, private val cache: FetchSessionCache, private val reqMetadata: JFetchMetadata, - private val fetchData: util.Map[TopicPartition, FetchRequest.PartitionData], + private val fetchDataAndError: FetchDataAndError, + private val topicIds: util.Map[String, Uuid], private val isFromFollower: Boolean) extends FetchContext { + val idErrors = new util.LinkedList(fetchDataAndError.idErrors.values()) override def getFetchOffset(part: TopicPartition): Option[Long] = -Option(fetchData.get(part)).map(_.fetchOffset) +Option(fetchDataAndError.fetchData.get(part)).map(_.fetchOffset) override def foreachPartition(fun: (TopicPartition, FetchRequest.PartitionData) => Unit): Unit = { -fetchData.forEach(fun(_, _)) +fetchDataAndError.fetchData.forEach(fun(_, _)) Review comment: When you mention that some callers need to iterate over all partitions like CLUSTER ACTION -- I'm a little confused. I thought the request context was passed into AuthHelper for that. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12213) Kafka Streams aggregation Initializer to accept record key
[ https://issues.apache.org/jira/browse/KAFKA-12213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17281463#comment-17281463 ] Piotr Fras commented on KAFKA-12213: Hi [~bbejeck], how are things with KIP-149 progressing? Is there anything I can help/assist with? > Kafka Streams aggregation Initializer to accept record key > -- > > Key: KAFKA-12213 > URL: https://issues.apache.org/jira/browse/KAFKA-12213 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Piotr Fras >Assignee: Piotr Fras >Priority: Minor > Labels: needs-kip > > Sometimes Kafka record key contains useful information for creating a zero > object in aggregation Initializer. This feature is to add kafka record key to > Initializer. > There were two approaches I considered to implement this feature, one > respecting backwards compatibility for internal and external APIs and the > other one which is not. I chose the latter one as it was more strait-forward. > We may want to validate this approach tho. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jsancio opened a new pull request #10085: KAFKA-12154: Snapshot Loading API
jsancio opened a new pull request #10085: URL: https://github.com/apache/kafka/pull/10085 WIP *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #10041: MINOR: Prevent creating partition.metadata until ID can be written
junrao commented on a change in pull request #10041: URL: https://github.com/apache/kafka/pull/10041#discussion_r572450889 ## File path: core/src/test/scala/unit/kafka/log/LogManagerTest.scala ## @@ -217,6 +217,7 @@ class LogManagerTest { } assertTrue(log.numberOfSegments > 1, "There should be more than one segment now.") log.updateHighWatermark(log.logEndOffset) +log.partitionMetadataFile.get.write(Uuid.randomUuid()) Review comment: That's what I am thinking. It's kind of weird to add the new file just to get the existing math work. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #10041: MINOR: Prevent creating partition.metadata until ID can be written
junrao commented on a change in pull request #10041: URL: https://github.com/apache/kafka/pull/10041#discussion_r572450889 ## File path: core/src/test/scala/unit/kafka/log/LogManagerTest.scala ## @@ -217,6 +217,7 @@ class LogManagerTest { } assertTrue(log.numberOfSegments > 1, "There should be more than one segment now.") log.updateHighWatermark(log.logEndOffset) +log.partitionMetadataFile.get.write(Uuid.randomUuid()) Review comment: That's what I am thinking. It's kind of weird to add the new file just to get the existing math works. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #10041: MINOR: Prevent creating partition.metadata until ID can be written
jolshan commented on a change in pull request #10041: URL: https://github.com/apache/kafka/pull/10041#discussion_r572445245 ## File path: core/src/test/scala/unit/kafka/log/LogManagerTest.scala ## @@ -217,6 +217,7 @@ class LogManagerTest { } assertTrue(log.numberOfSegments > 1, "There should be more than one segment now.") log.updateHighWatermark(log.logEndOffset) +log.partitionMetadataFile.get.write(Uuid.randomUuid()) Review comment: So remove the creation and subtract one file? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #10041: MINOR: Prevent creating partition.metadata until ID can be written
junrao commented on a change in pull request #10041: URL: https://github.com/apache/kafka/pull/10041#discussion_r57205 ## File path: core/src/test/scala/unit/kafka/log/LogManagerTest.scala ## @@ -217,6 +217,7 @@ class LogManagerTest { } assertTrue(log.numberOfSegments > 1, "There should be more than one segment now.") log.updateHighWatermark(log.logEndOffset) +log.partitionMetadataFile.get.write(Uuid.randomUuid()) Review comment: Got it. Since the partitionMetadataFile is now created on demand, perhaps we could just change the math on the expected number of files? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10066: KAFKA-12278; Ensure exposed api versions are consistent within listener scopes
cmccabe commented on a change in pull request #10066: URL: https://github.com/apache/kafka/pull/10066#discussion_r572443431 ## File path: generator/src/main/java/org/apache/kafka/message/RequestApiScope.java ## @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.message; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public enum RequestApiScope { +@JsonProperty("zkBroker") +ZK_BROKER, + +@JsonProperty("broker") +BROKER, + +@JsonProperty("controller") +CONTROLLER, + +@JsonProperty("raft") +RAFT; Review comment: @ijuma : unfortunately we don't have a clear separation between clients and brokers at the protocol level. As you know, if a random node connects to the broker and asks for ApiVersions, the broker doesn't (yet) know if the other node is another broker or if it's a client. So it wouldn't help to label APIs as CLIENT vs. BROKER, since the broker has to send back both anyway. On the other hand, we can definitely put APIs into "zk broker", "kip-500 zk", and "kip500 controller" and use those buckets to figure out what to send in the ApiVersionsRequest. @hachikuji : The "raft" scope seems a little weird since all the other scopes map to listeners on servers. The TestRaftServer is just an internal junit thing, right? So it's probably fine if it just uses the Controller scope and returns `UnsupportedVersionException` for things it doesn't support. I don't think we should have a separate scope for this unless it somehow impacts the real servers. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #10069: MINOR: Add RaftReplicaManager
rondagostino commented on a change in pull request #10069: URL: https://github.com/apache/kafka/pull/10069#discussion_r572442503 ## File path: core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala ## @@ -39,11 +41,30 @@ object MetadataPartition { record.isr(), Collections.emptyList(), // TODO KAFKA-12285 handle offline replicas Collections.emptyList(), - Collections.emptyList()) + Collections.emptyList(), + largestDeferredOffsetEverSeen = deferredAtOffset.getOrElse(OffsetNeverDeferred), + isCurrentlyDeferringChanges = deferredAtOffset.isDefined) Review comment: We basically use `largestDeferredOffsetEverSeen` only for logging at this point -- we also check it in a few `private def sanityCheckState...()` `RaftReplicaManager` methods. We could completely eliminate `largestDeferredOffsetEverSeen` if we didn't want to log when the partition was last deferred. It just tracks when the partition was last seen and the change at that offset was deferred rather than directly applied. Once the partition is no longer deferred the value remains whatever it was and the boolean flips to `false`. It does seem on the surface that we could change the declaration to `deferredSinceOffset` and get rid of the boolean -- and `deferredSinceOffset` would change to `-1` once those changes are applied. But there is a problem with this if the partition changes to not being deferred in the metadata cache before we ask `RaftReplicaManager` to process all of its deferred changes: the value will be -1 in the metadata cache under those circumstances, and we wouldn't have the value to log. So I think we have a few options. 1. Do the logging, apply the changes to the matadata cache before replica manager, and keep the `Long` and `Boolean` as currently defined 2. Do the logging, apply the changes to the matadata cache **after** replica manager, and use just a `Long` (with the semantics being changed as described above) 3. Just use a Boolean and don't do the logging. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #10069: MINOR: Add RaftReplicaManager
cmccabe commented on pull request #10069: URL: https://github.com/apache/kafka/pull/10069#issuecomment-775521260 Thanks for this PR, @rondagostino ! I can see why you wanted to have `RaftReplicaChangeDelegateHelper`. The `ReplicaManager` is not very easy to unit test because it has grown so large. I don't think this delegate thing is quite the right abstraction here-- it's pretty confusing-- but I guess let's revisit this after 2.8 is finished. I suppose one option is, once `ReplicaManager` is a pure interface, we can split the kip-500 update logic off into a separate set of functions that takes a `ReplicaManager` as an input. Then we can easily unit-test the update logic with a `MockReplicaManager`. For now I left two small comments... LGTM after those are addressed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10069: MINOR: Add RaftReplicaManager
cmccabe commented on a change in pull request #10069: URL: https://github.com/apache/kafka/pull/10069#discussion_r572433038 ## File path: core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala ## @@ -30,7 +31,8 @@ import scala.jdk.CollectionConverters._ object MetadataPartition { - def apply(name: String, record: PartitionRecord): MetadataPartition = { + val OffsetNeverDeferred = 0L // must not be a valid offset we could see (i.e. must not be positive) Review comment: Can you add JavaDoc for this? Also, what about `NoDeferredOffset` as a name? One last question... why is this 0 and not -1? 0 is a valid offset in the log, whereas -1 is not. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10069: MINOR: Add RaftReplicaManager
cmccabe commented on a change in pull request #10069: URL: https://github.com/apache/kafka/pull/10069#discussion_r572433443 ## File path: core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala ## @@ -39,11 +41,30 @@ object MetadataPartition { record.isr(), Collections.emptyList(), // TODO KAFKA-12285 handle offline replicas Collections.emptyList(), - Collections.emptyList()) + Collections.emptyList(), + largestDeferredOffsetEverSeen = deferredAtOffset.getOrElse(OffsetNeverDeferred), + isCurrentlyDeferringChanges = deferredAtOffset.isDefined) Review comment: Hmm... why do we need this boolean? Can't we just check if `largestDeferredOffsetEverSeen` is not `OffsetNeverDeferred` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10069: MINOR: Add RaftReplicaManager
cmccabe commented on a change in pull request #10069: URL: https://github.com/apache/kafka/pull/10069#discussion_r572433038 ## File path: core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala ## @@ -30,7 +31,8 @@ import scala.jdk.CollectionConverters._ object MetadataPartition { - def apply(name: String, record: PartitionRecord): MetadataPartition = { + val OffsetNeverDeferred = 0L // must not be a valid offset we could see (i.e. must not be positive) Review comment: Can you add JavaDoc for this? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #10041: MINOR: Prevent creating partition.metadata until ID can be written
jolshan commented on a change in pull request #10041: URL: https://github.com/apache/kafka/pull/10041#discussion_r572421631 ## File path: core/src/test/scala/unit/kafka/log/LogManagerTest.scala ## @@ -217,6 +217,7 @@ class LogManagerTest { } assertTrue(log.numberOfSegments > 1, "There should be more than one segment now.") log.updateHighWatermark(log.logEndOffset) +log.partitionMetadataFile.get.write(Uuid.randomUuid()) Review comment: I wanted to test the correct files are kept/deleted. I could also change the math in line 234 if we don't need this test. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on pull request #9780: KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics
rhauch commented on pull request #9780: URL: https://github.com/apache/kafka/pull/9780#issuecomment-775505877 Added another commit that uses Mockito in the new `SharedTopicAdminTest` class to do what @kkonstantine had suggested and which was not possible with EasyMock. Mockito FTW! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on a change in pull request #9780: KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics
rhauch commented on a change in pull request #9780: URL: https://github.com/apache/kafka/pull/9780#discussion_r572421771 ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/util/SharedTopicAdminTest.java ## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.util; + +import java.time.Duration; +import java.util.Collections; +import java.util.Map; + +import org.apache.kafka.connect.errors.ConnectException; +import org.easymock.EasyMock; +import org.easymock.Mock; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.easymock.PowerMock; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertThrows; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(KafkaBasedLog.class) +@PowerMockIgnore("javax.management.*") +public class SharedTopicAdminTest { + +private static final Map CONFIG = Collections.emptyMap(); + +@Mock private TopicAdmin mockTopicAdmin; +private SharedTopicAdmin sharedAdmin; +private int created = 0; Review comment: Okay, Mockito FTW! I've rewritten the `SharedTopicAdminTest` class to use Mockito instead of PowerMock and EasyMock, and was able to use mocks to assert the correct number of times an admin instance was created and closed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #10041: MINOR: Prevent creating partition.metadata until ID can be written
jolshan commented on a change in pull request #10041: URL: https://github.com/apache/kafka/pull/10041#discussion_r572421631 ## File path: core/src/test/scala/unit/kafka/log/LogManagerTest.scala ## @@ -217,6 +217,7 @@ class LogManagerTest { } assertTrue(log.numberOfSegments > 1, "There should be more than one segment now.") log.updateHighWatermark(log.logEndOffset) +log.partitionMetadataFile.get.write(Uuid.randomUuid()) Review comment: I wanted to test that the file is actually getting deleted. In order to check it gets deleted it has to be created. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #10041: MINOR: Prevent creating partition.metadata until ID can be written
jolshan commented on a change in pull request #10041: URL: https://github.com/apache/kafka/pull/10041#discussion_r572420716 ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -341,10 +342,15 @@ class Log(@volatile private var _dir: File, producerStateManager.removeStraySnapshots(segments.values().asScala.map(_.baseOffset).toSeq) loadProducerState(logEndOffset, reloadFromCleanShutdown = hadCleanShutdown) -// Recover topic ID if present +// Delete partition metadata file if the version does not support topic IDs. +// Recover topic ID if present and topic IDs are supported partitionMetadataFile.foreach { file => - if (!file.isEmpty()) -topicId = file.read().topicId + if (file.exists()) { +if (!usesTopicId) + file.delete() Review comment: I modeled this off of `leaderEpochCache` which is also an option. But I see how that is different now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #10041: MINOR: Prevent creating partition.metadata until ID can be written
jolshan commented on a change in pull request #10041: URL: https://github.com/apache/kafka/pull/10041#discussion_r572419692 ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -256,7 +256,8 @@ class Log(@volatile private var _dir: File, val topicPartition: TopicPartition, val producerStateManager: ProducerStateManager, logDirFailureChannel: LogDirFailureChannel, - private val hadCleanShutdown: Boolean = true) extends Logging with KafkaMetricsGroup { + private val hadCleanShutdown: Boolean = true, + val usesTopicId: Boolean = true) extends Logging with KafkaMetricsGroup { Review comment: I named this based on the config's name. But the name you suggested is more descriptive. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #10041: MINOR: Prevent creating partition.metadata until ID can be written
junrao commented on a change in pull request #10041: URL: https://github.com/apache/kafka/pull/10041#discussion_r572399914 ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -341,10 +342,15 @@ class Log(@volatile private var _dir: File, producerStateManager.removeStraySnapshots(segments.values().asScala.map(_.baseOffset).toSeq) loadProducerState(logEndOffset, reloadFromCleanShutdown = hadCleanShutdown) -// Recover topic ID if present +// Delete partition metadata file if the version does not support topic IDs. +// Recover topic ID if present and topic IDs are supported partitionMetadataFile.foreach { file => - if (!file.isEmpty()) -topicId = file.read().topicId + if (file.exists()) { +if (!usesTopicId) + file.delete() Review comment: Does partitionMetadataFile need to be of Some? It seems that we can just always instantiate the object. ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -256,7 +256,8 @@ class Log(@volatile private var _dir: File, val topicPartition: TopicPartition, val producerStateManager: ProducerStateManager, logDirFailureChannel: LogDirFailureChannel, - private val hadCleanShutdown: Boolean = true) extends Logging with KafkaMetricsGroup { + private val hadCleanShutdown: Boolean = true, + val usesTopicId: Boolean = true) extends Logging with KafkaMetricsGroup { Review comment: Could we add the new param to javadoc? Also, will keepPartitionMetdataFile be better than usesTopicId? ## File path: core/src/main/scala/kafka/server/PartitionMetadataFile.scala ## @@ -91,11 +91,10 @@ class PartitionMetadataFile(val file: File, private val lock = new Object() private val logDir = file.getParentFile.getParent - - try Files.createFile(file.toPath) // create the file if it doesn't exist - catch { case _: FileAlreadyExistsException => } - def write(topicId: Uuid): Unit = { +try Files.createFile(file.toPath) // create the file if it doesn't exist Review comment: Do we need to create the file first? It seems that later on we always rename the temp file to this one. ## File path: core/src/test/scala/unit/kafka/log/LogManagerTest.scala ## @@ -217,6 +217,7 @@ class LogManagerTest { } assertTrue(log.numberOfSegments > 1, "There should be more than one segment now.") log.updateHighWatermark(log.logEndOffset) +log.partitionMetadataFile.get.write(Uuid.randomUuid()) Review comment: Is this needed? It seems Log never reads UUID? Ditto below. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request
jolshan commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r572413017 ## File path: core/src/main/scala/kafka/server/FetchSession.scala ## @@ -71,6 +75,7 @@ object FetchSession { * localLogStartOffset is the log start offset of the partition on this broker. */ class CachedPartition(val topic: String, + var topicId: Uuid, Review comment: I thought about this, but I was worried about some weirdness where we need to support partitions before and after they have an id. (The partitions are techincally equivalent, but equals wouldn't reflect that) This may also cause problems in cases where IDs may change. Consider the code ` val cachedPart = session.partitionMap.find(new CachedPartition(topicPart))` This is used in the path of deleting partitions with stale IDs. We would need to know the topic ID to find the partition here. I could see potential issues where we no longer have the ID and would have trouble removing it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request
jolshan commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r572413017 ## File path: core/src/main/scala/kafka/server/FetchSession.scala ## @@ -71,6 +75,7 @@ object FetchSession { * localLogStartOffset is the log start offset of the partition on this broker. */ class CachedPartition(val topic: String, + var topicId: Uuid, Review comment: I thought about this, but I was worried about some weirdness where we need to support partitions before and after they have an id. This may also cause problems in cases where IDs may change. Consider the code ` val cachedPart = session.partitionMap.find(new CachedPartition(topicPart))` This is used in the path of deleting partitions with stale IDs. We would need to know the topic ID to find the partition here. I could see potential issues where we no longer have the ID and would have trouble removing it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe opened a new pull request #10084: MINOR: Rename DecommissionBrokers to UnregisterBrokers
cmccabe opened a new pull request #10084: URL: https://github.com/apache/kafka/pull/10084 Rename DecommissionBrokers to UnregisterBrokers. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9986: JUnit extensions for integration tests
mumrah commented on a change in pull request #9986: URL: https://github.com/apache/kafka/pull/9986#discussion_r572393736 ## File path: core/src/test/java/kafka/test/ClusterConfig.java ## @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.test; + +import kafka.test.annotation.Type; +import org.apache.kafka.common.security.auth.SecurityProtocol; + +import java.io.File; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; + +/** + * Represents a requested configuration of a Kafka cluster for integration testing + */ +public class ClusterConfig { + +private final Type type; +private final int brokers; +private final int controllers; +private final String name; +private final boolean autoStart; + +private final String securityProtocol; +private final String listenerName; +private final File trustStoreFile; + +private final Properties serverProperties = new Properties(); +private final Properties producerProperties = new Properties(); +private final Properties consumerProperties = new Properties(); +private final Properties adminClientProperties = new Properties(); +private final Properties saslServerProperties = new Properties(); +private final Properties saslClientProperties = new Properties(); + +ClusterConfig(Type type, int brokers, int controllers, String name, boolean autoStart, + String securityProtocol, String listenerName, File trustStoreFile) { +this.type = type; +this.brokers = brokers; +this.controllers = controllers; +this.name = name; +this.autoStart = autoStart; +this.securityProtocol = securityProtocol; +this.listenerName = listenerName; +this.trustStoreFile = trustStoreFile; +} + +public Type clusterType() { +return type; +} + +public int brokers() { +return brokers; +} + +public int controllers() { +return controllers; Review comment: In ZK mode, number of controllers is basically ignored. I can add some logic to validate that someone hasn't mistakenly requested more than one controller for a ZK cluster. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #9986: JUnit extensions for integration tests
cmccabe commented on a change in pull request #9986: URL: https://github.com/apache/kafka/pull/9986#discussion_r572392149 ## File path: core/src/test/java/kafka/test/annotation/AutoStart.java ## @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.test.annotation; + +public enum AutoStart { Review comment: Thanks for the explanation. That makes sense. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9986: JUnit extensions for integration tests
mumrah commented on a change in pull request #9986: URL: https://github.com/apache/kafka/pull/9986#discussion_r572387041 ## File path: core/src/test/java/kafka/test/annotation/AutoStart.java ## @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.test.annotation; + +public enum AutoStart { Review comment: Since we can't use `null` as defaults for annotations, I added DEFAULT enum constants for cluster type and auto start attributes. When processing the `@ClusterTest` method annotations, if we see Type.DEFAULT or AutoStart.DEFAULT, we know the test method is saying "use the default". We could simply put the defaults right in the `ClusterTest` annotation, but then we would have an issue when method-level `@ClusterTest` and class-level `@ClusterTestDefaults` are both in use. How would we know which to use? I added AutoStart.DEFAULT in order to let the method-level annotation explicitly defer to the default. Here's an example: ```java // indicate that for this test, we want to disable auto-start by default @ClusterTestDefaults(autoStart = false) public class TestClass { // but this test does want auto start @ClusterTest(autoStart = AutoStart.YES) public void test1() { } // this test is unnecessarily setting auto start as false @ClusterTest(autoStart = AutoStart.NO) public void test2() { } // this test will get its auto start value from the class level @ClusterTest public void test3() { } } ``` An alternative would be to allow for a class-level annotation that completely overrides any method-level `@ClusterTest`, but that seems a little backwards. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9986: JUnit extensions for integration tests
mumrah commented on a change in pull request #9986: URL: https://github.com/apache/kafka/pull/9986#discussion_r572387041 ## File path: core/src/test/java/kafka/test/annotation/AutoStart.java ## @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.test.annotation; + +public enum AutoStart { Review comment: Since we can't use `null` as defaults for annotations, I added DEFAULT enum constants for cluster type and auto start attributes. When processing the `@ClusterTest` method annotations, if we see Type.DEFAULT or AutoStart.DEFAULT, we know the test method is saying "use the default". We could simply put the defaults right in the `ClusterTest` annotation, but then we would have an issue when method-level `@ClusterTest` and class-level `@ClusterTestDefaults` are both in use. How would we know which to use? I added AutoStart.DEFAULT in order to let the method-level annotation explicitly defer to the default. Here's an example: ```java // indicate that for this test, we want to disable auto-start by default @ClusterTestDefaults(autoStart = false) public class TestClass { // but this test does want auto start @ClusterTest(autoStart = AutoStart.YES) public void test1() { } // this test is unnecessarily setting auto start as false @ClusterTest(autoStart = AutoStart.NO) public void test2() { } // this test will get its autoStart value from the class level @ClusterTest public void test3() { } } ``` An alternative would be to allow for a class-level annotation that completely overrides any method-level `@ClusterTest`, but that seems a little backwards. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request
jolshan commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r572381787 ## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java ## @@ -143,34 +275,71 @@ public String toString() { return result; } +private List toForgottenTopicList(List forgottenTopics, Map topicNames) { +List result = new ArrayList<>(); +forgottenTopics.forEach(forgottenTopic -> +forgottenTopic.partitions().forEach(partitionId -> { +String name = topicNames.get(forgottenTopic.topicId()); +if (name != null) +result.add(new TopicPartition(forgottenTopic.topic(), partitionId)); +}) +); +return result; +} + +// Only used when Fetch is version 13 or greater. +private ToForgetAndIds toForgottenTopicListAndIds(List forgottenTopics, Map topicNames) { +List result = new ArrayList<>(); +Map> unresolvedIds = new HashMap<>(); +forgottenTopics.forEach(forgottenTopic -> { +Set partitions = new HashSet<>(); +forgottenTopic.partitions().forEach(partitionId -> { +String name = topicNames.get(forgottenTopic.topicId()); +if (name != null) +result.add(new TopicPartition(forgottenTopic.topic(), partitionId)); +else +partitions.add(partitionId); +}); +if (unresolvedIds.containsKey(forgottenTopic.topicId())) { +unresolvedIds.get(forgottenTopic.topicId()).addAll(partitions); +} else { +unresolvedIds.put(forgottenTopic.topicId(), partitions); Review comment: Ah. This is confusing due to how I named things. Basically, I'm collecting a set of partitions `partitions` for a given topic where the ID was not resolved. Then I'm adding them to unresolvedIds. This is a mapping from the topic ID to all the partitions that should be forgotten. I can rename and add comments to clarify what is happening here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request
jolshan commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r572377972 ## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java ## @@ -133,6 +200,71 @@ public String toString() { return Collections.unmodifiableMap(result); } +private Map toPartitionDataMap(List fetchableTopics, Map topicNames) { +Map result = new LinkedHashMap<>(); +fetchableTopics.forEach(fetchTopic -> { +String name = topicNames.get(fetchTopic.topicId()); +if (name != null) { +fetchTopic.partitions().forEach(fetchPartition -> +result.put(new TopicPartition(name, fetchPartition.partition()), +new PartitionData( +fetchPartition.fetchOffset(), +fetchPartition.logStartOffset(), +fetchPartition.partitionMaxBytes(), + optionalEpoch(fetchPartition.currentLeaderEpoch()), + optionalEpoch(fetchPartition.lastFetchedEpoch()) +) +) +); +} +}); +return Collections.unmodifiableMap(result); +} + +// Only used when Fetch is version 13 or greater. +private FetchDataAndError toPartitionDataMapAndError(List fetchableTopics, Map topicNames) { +Map fetchData = new LinkedHashMap<>(); +List unresolvedPartitions = new LinkedList<>(); +Map idErrors = new HashMap<>(); +Errors error; +if (topicNames.isEmpty()) { +error = Errors.UNSUPPORTED_VERSION; +} else { +error = Errors.UNKNOWN_TOPIC_ID; +} +fetchableTopics.forEach(fetchTopic -> { +String name = topicNames.get(fetchTopic.topicId()); +if (name != null) { +fetchTopic.partitions().forEach(fetchPartition -> +fetchData.put(new TopicPartition(name, fetchPartition.partition()), +new PartitionData( +fetchPartition.fetchOffset(), +fetchPartition.logStartOffset(), +fetchPartition.partitionMaxBytes(), + optionalEpoch(fetchPartition.currentLeaderEpoch()), + optionalEpoch(fetchPartition.lastFetchedEpoch()) +) +) +); +} else { +unresolvedPartitions.add(new UnresolvedPartitions(fetchTopic.topicId(), fetchTopic.partitions().stream().collect(Collectors.toMap( +FetchRequestData.FetchPartition::partition, fetchPartition -> new PartitionData( +fetchPartition.fetchOffset(), +fetchPartition.logStartOffset(), +fetchPartition.partitionMaxBytes(), +optionalEpoch(fetchPartition.currentLeaderEpoch()), +optionalEpoch(fetchPartition.lastFetchedEpoch())); + +if (idErrors.containsKey(fetchTopic.topicId())) + idErrors.get(fetchTopic.topicId()).addPartitions(fetchTopic.partitions().stream().map(part -> part.partition()).collect(Collectors.toList())); Review comment: I realize this is a bit confusing. addPartitions method takes a list What this line is doing is grabbing the idError object and adding partitions to it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request
jolshan commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r572373623 ## File path: core/src/main/scala/kafka/server/FetchSession.scala ## @@ -425,16 +598,26 @@ class IncrementalFetchContext(private val time: Time, val topicPart = element.getKey val respData = element.getValue val cachedPart = session.partitionMap.find(new CachedPartition(topicPart)) -val mustRespond = cachedPart.maybeUpdateResponseData(respData, updateFetchContextAndRemoveUnselected) -if (mustRespond) { + +if (cachedPart.topicId == Uuid.ZERO_UUID) + cachedPart.addId(topicIds.getOrDefault(topicPart.topic, Uuid.ZERO_UUID)) + +if (cachedPart.topicId != topicIds.getOrDefault(topicPart.topic, Uuid.ZERO_UUID)) { nextElement = element - if (updateFetchContextAndRemoveUnselected) { -session.partitionMap.remove(cachedPart) -session.partitionMap.mustAdd(cachedPart) - } + session.partitionMap.remove(cachedPart) + iter.remove() Review comment: If we run into this scenario, does it make sense to always return with an UNKNOWN_TOPIC_ID error? Sometimes partitions will be skipped over anyway when `mustRespond` is false, so should those also return UNKNOWN_TOPIC_ID? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request
jolshan commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r572370494 ## File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala ## @@ -216,10 +217,10 @@ class ReplicaFetcherThread(name: String, try { val clientResponse = leaderEndpoint.sendRequest(fetchRequest) val fetchResponse = clientResponse.responseBody.asInstanceOf[FetchResponse[Records]] - if (!fetchSessionHandler.handleResponse(fetchResponse)) { + if (!fetchSessionHandler.handleResponse(fetchResponse, fetchRequest.latestAllowedVersion())) { Review comment: Good point, it should probably be along the lines of `if (fetchRequestVersion >= 13 && !fetchData.canUseTopicIds) 12 else fetchRequestVersion` to match how it is sent below. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request
jolshan commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r572367849 ## File path: core/src/main/scala/kafka/server/FetchSession.scala ## @@ -237,14 +317,80 @@ class FetchSession(val id: Int, type TL = util.ArrayList[TopicPartition] // Update the cached partition data based on the request. - def update(fetchData: FetchSession.REQ_MAP, - toForget: util.List[TopicPartition], - reqMetadata: JFetchMetadata): (TL, TL, TL) = synchronized { + def update(version: Short, + fetchDataAndError: FetchDataAndError, + toForgetAndIds: ToForgetAndIds, + reqMetadata: JFetchMetadata, + topicIds: util.Map[String, Uuid], + topicNames: util.Map[Uuid, String]): (TL, TL, TL) = synchronized { val added = new TL val updated = new TL val removed = new TL -fetchData.forEach { (topicPart, reqData) => - val newCachedPart = new CachedPartition(topicPart, reqData) + +// Only make changes to topic IDs if we have a new request version. +// If we receive an old request version, ignore all topic ID code, keep IDs that are there. +if (version >= 13) { + val error = if (topicNames.isEmpty) Errors.UNSUPPORTED_VERSION else Errors.UNKNOWN_TOPIC_ID + val unresolvedIterator = unresolvedPartitions.iterator() + while (unresolvedIterator.hasNext()) { +val partition = unresolvedIterator.next() + +// Remove from unresolvedPartitions if ID is unresolved in toForgetIds +val forgetPartitions = toForgetAndIds.toForgetIds.get(partition.topicId) +if (forgetPartitions != null && forgetPartitions.contains(partition.partition)) + unresolvedIterator.remove() + +// Try to resolve ID, if there is a name for the given ID, place a CachedPartition in partitionMap +// and remove from unresolvedPartitions. +else if (topicNames.get(partition.topicId) != null) { + val newTp = new TopicPartition(topicNames.get(partition.topicId), partition.partition) + val newCp = new CachedPartition(newTp, partition.topicId, partition.reqData) + partitionMap.add(newCp) + added.add(newTp) + unresolvedIterator.remove() +} else { + val idError = fetchDataAndError.idErrors.get(partition.topicId) Review comment: This is for adding unresolved partitions in the session but not in the request. I can add comments to clarify what is happening. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request
jolshan commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r572363402 ## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java ## @@ -270,24 +314,40 @@ public T records() { */ public FetchResponse(Errors error, LinkedHashMap> responseData, + List idErrors, + Map topicIds, int throttleTimeMs, int sessionId) { super(ApiKeys.FETCH); -this.data = toMessage(throttleTimeMs, error, responseData.entrySet().iterator(), sessionId); +this.data = toMessage(throttleTimeMs, error, responseData.entrySet().iterator(), idErrors, topicIds, sessionId); this.responseDataMap = responseData; } public FetchResponse(FetchResponseData fetchResponseData) { super(ApiKeys.FETCH); this.data = fetchResponseData; -this.responseDataMap = toResponseDataMap(fetchResponseData); +if (!supportsTopicIds()) { +this.responseDataMap = toResponseDataMap(fetchResponseData); +} else { +this.responseDataMap = null; +} } public Errors error() { return Errors.forCode(data.errorCode()); } -public LinkedHashMap> responseData() { +public LinkedHashMap> responseData(Map topicNames) { +if (!supportsTopicIds()) +return responseDataMap; +return toResponseDataMap(data, topicNames); + +} + +// Used when we can guarantee responseData is populated with all possible partitions +// This occurs when we have a response version < 13 or we built the FetchResponse with +// responseDataMap as a parameter and we have the same topic IDs available. +public LinkedHashMap> resolvedResponseData() { Review comment: I agree. I think it stems from exactly what you said...that `FetchContext.updateAndGenerateResponseData()` generates a response only for it to be generated again. Currently ` FetchContext.updateAndGenerateResponseData()` does include all partitions (resolved and unresolved). The issue is that the partitions need to be down-converted. The way this works is that the partitions are pulled from the FetchResponse object itself. However, the issue is that I've changed responseData and since this is the newest version of the response, it will try to reconstruct the map instead of pulling the object `partitionData`. (Which is too slow) I thought about changing the method to always return the map when it is not null, but that caused some issues in some places as well. I can look into this again though. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request
jolshan commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r572363402 ## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java ## @@ -270,24 +314,40 @@ public T records() { */ public FetchResponse(Errors error, LinkedHashMap> responseData, + List idErrors, + Map topicIds, int throttleTimeMs, int sessionId) { super(ApiKeys.FETCH); -this.data = toMessage(throttleTimeMs, error, responseData.entrySet().iterator(), sessionId); +this.data = toMessage(throttleTimeMs, error, responseData.entrySet().iterator(), idErrors, topicIds, sessionId); this.responseDataMap = responseData; } public FetchResponse(FetchResponseData fetchResponseData) { super(ApiKeys.FETCH); this.data = fetchResponseData; -this.responseDataMap = toResponseDataMap(fetchResponseData); +if (!supportsTopicIds()) { +this.responseDataMap = toResponseDataMap(fetchResponseData); +} else { +this.responseDataMap = null; +} } public Errors error() { return Errors.forCode(data.errorCode()); } -public LinkedHashMap> responseData() { +public LinkedHashMap> responseData(Map topicNames) { +if (!supportsTopicIds()) +return responseDataMap; +return toResponseDataMap(data, topicNames); + +} + +// Used when we can guarantee responseData is populated with all possible partitions +// This occurs when we have a response version < 13 or we built the FetchResponse with +// responseDataMap as a parameter and we have the same topic IDs available. +public LinkedHashMap> resolvedResponseData() { Review comment: I agree. I think it stems from exactly what you said...that `FetchContext.updateAndGenerateResponseData()` generates a response only for it to be generated again. Currently ` FetchContext.updateAndGenerateResponseData()` does include all partitions (resolved and unresolved). The issue is that the partitions need to be down-converted. The way this works is that the partitions are pulled from the FetchResponse object itself. However, the issue is that I've changed responseData and since this is the newest version of the response, it will try to reconstruct the map instead of pulling the object `partitionData` I thought about changing the method to always return the map when it is not null, but that caused some issues in some places as well. I can look into this again though. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request
jolshan commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r572357857 ## File path: core/src/main/scala/kafka/server/FetchSession.scala ## @@ -237,14 +317,80 @@ class FetchSession(val id: Int, type TL = util.ArrayList[TopicPartition] // Update the cached partition data based on the request. - def update(fetchData: FetchSession.REQ_MAP, - toForget: util.List[TopicPartition], - reqMetadata: JFetchMetadata): (TL, TL, TL) = synchronized { + def update(version: Short, + fetchDataAndError: FetchDataAndError, + toForgetAndIds: ToForgetAndIds, + reqMetadata: JFetchMetadata, + topicIds: util.Map[String, Uuid], + topicNames: util.Map[Uuid, String]): (TL, TL, TL) = synchronized { val added = new TL val updated = new TL val removed = new TL -fetchData.forEach { (topicPart, reqData) => - val newCachedPart = new CachedPartition(topicPart, reqData) + +// Only make changes to topic IDs if we have a new request version. +// If we receive an old request version, ignore all topic ID code, keep IDs that are there. +if (version >= 13) { + val error = if (topicNames.isEmpty) Errors.UNSUPPORTED_VERSION else Errors.UNKNOWN_TOPIC_ID Review comment: I've gone back and forth on this. One one hand, you are right that this is confusing in the case where we are doing and upgrade and ID propagation is delayed. On the other hand, in the non-upgrade case, returning an UNKNOWN_TOPIC_ID error when topic IDs are not even supported might not be as informative. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe merged pull request #10043: MINOR: Add StorageTool as specified in KIP-631
cmccabe merged pull request #10043: URL: https://github.com/apache/kafka/pull/10043 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #10043: MINOR: Add StorageTool as specified in KIP-631
mumrah commented on a change in pull request #10043: URL: https://github.com/apache/kafka/pull/10043#discussion_r572357477 ## File path: core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala ## @@ -88,9 +88,9 @@ class RawMetaProperties(val props: Properties = new Properties()) { } override def toString: String = { -"RawMetaProperties(" + props.keySet().asScala.toList.asInstanceOf[List[String]].sorted.map { +"{" + props.keySet().asScala.toList.asInstanceOf[List[String]].sorted.map { Review comment: ok, sounds good This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request
jolshan commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r572356813 ## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java ## @@ -110,7 +116,68 @@ public String toString() { } } -private Optional optionalEpoch(int rawEpochValue) { +public static final class UnresolvedPartitions { Review comment: The reason I name it this is we maintain such an object for each partition that was unresolved. If we simply have one object per topic, we would need a way to know all the partitions for the topic that were requested. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request
jolshan commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r572355958 ## File path: clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java ## @@ -265,9 +353,32 @@ public FetchRequestData build() { Map curSessionPartitions = copySessionPartitions ? Collections.unmodifiableMap(new LinkedHashMap<>(sessionPartitions)) : Collections.unmodifiableMap(sessionPartitions); +Map toSendTopicIds = +Collections.unmodifiableMap(new HashMap<>(sessionTopicIds)); +Map toSendTopicNames = +Collections.unmodifiableMap(new HashMap<>(sessionTopicNames)); +boolean canUseTopicIds = sessionPartitionsPerTopic.size() == toSendTopicIds.size(); + next = null; +topicIds = null; +topicNames = null; +partitionsPerTopic = null; return new FetchRequestData(toSend, Collections.unmodifiableList(removed), -curSessionPartitions, nextMetadata); +curSessionPartitions, toSendTopicIds, toSendTopicNames, nextMetadata, canUseTopicIds); +} + +private void addPartitionsAndIds(Map> partitionsPerTopic, + TopicPartition tp, Uuid id, Map topicIds, + Map topicNames) { +if (partitionsPerTopic.containsKey(tp.topic())) { +partitionsPerTopic.get(tp.topic()).add(tp.partition()); +} else { +partitionsPerTopic.put(tp.topic(), new HashSet<>(tp.partition())); Review comment: Yeah. Good catch. I'm going to experiment with this code a bit to see if it's faster to maintain this set or just get a set of topics from the map of topic partitions in` FetchRequestData` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request
jolshan commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r572354491 ## File path: clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java ## @@ -73,6 +76,25 @@ public FetchSessionHandler(LogContext logContext, int node) { private LinkedHashMap sessionPartitions = new LinkedHashMap<>(0); +/** + * All of the topic ids mapped to topic names for topics which exist in the fetch request session. + */ +private Map sessionTopicIds = new HashMap<>(0); Review comment: I think I was just matching `sessionPartitions` above This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request
jolshan commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r572351945 ## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java ## @@ -270,24 +314,40 @@ public T records() { */ public FetchResponse(Errors error, LinkedHashMap> responseData, + List idErrors, Review comment: Sounds good to me This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request
jolshan commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r572351801 ## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java ## @@ -259,8 +262,49 @@ public T records() { } } +public static final class IdError { +private final Uuid id; +private final Set partitions; +private final Errors error; + +public IdError(Uuid id, +List partitions, +Errors error) { +this.id = id; +this.partitions = new HashSet<>(partitions); +this.error = error; +} + +public Uuid id() { +return this.id; +} + +public Set partitions() { +return this.partitions; +} + +public void addPartitions(List partitions) { +partitions.forEach(partition -> { +partitions.add(partition); +}); +} + +private List errorData() { +return partitions.stream().map(partition -> new FetchResponseData.FetchablePartitionResponse() +.setPartition(partition) +.setErrorCode(error.code()) +.setHighWatermark(FetchResponse.INVALID_HIGHWATERMARK) + .setLastStableOffset(FetchResponse.INVALID_LAST_STABLE_OFFSET) +.setLogStartOffset(FetchResponse.INVALID_LOG_START_OFFSET) +.setAbortedTransactions(null) + .setPreferredReadReplica(FetchResponse.INVALID_PREFERRED_REPLICA_ID) + .setRecordSet(MemoryRecords.EMPTY)).collect(Collectors.toList()); +} + +} + /** - * From version 3 or later, the entries in `responseData` should be in the same order as the entries in + * From version 3 or later, the 'interesting' entries in `responseData` should be in the same order as the entries in Review comment: 'interesting' was the name of the map of partitionData. I believe they are topic partitions that are authorized and exist. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request
jolshan commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r572350818 ## File path: core/src/main/scala/kafka/server/FetchSession.scala ## @@ -237,14 +317,80 @@ class FetchSession(val id: Int, type TL = util.ArrayList[TopicPartition] // Update the cached partition data based on the request. - def update(fetchData: FetchSession.REQ_MAP, - toForget: util.List[TopicPartition], - reqMetadata: JFetchMetadata): (TL, TL, TL) = synchronized { + def update(version: Short, + fetchDataAndError: FetchDataAndError, + toForgetAndIds: ToForgetAndIds, + reqMetadata: JFetchMetadata, + topicIds: util.Map[String, Uuid], + topicNames: util.Map[Uuid, String]): (TL, TL, TL) = synchronized { val added = new TL val updated = new TL val removed = new TL -fetchData.forEach { (topicPart, reqData) => - val newCachedPart = new CachedPartition(topicPart, reqData) + +// Only make changes to topic IDs if we have a new request version. Review comment: Sorry this was unclear. I meant changes involving topic IDs. I will adjust this comment. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10728) Mirroring data without decompressing with MirrorMaker 2.0
[ https://issues.apache.org/jira/browse/KAFKA-10728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17281352#comment-17281352 ] Henry Cai commented on KAFKA-10728: --- Please take a look at [https://cwiki.apache.org/confluence/display/KAFKA/KIP-712%3A+Shallow+Mirroring,] this should help. > Mirroring data without decompressing with MirrorMaker 2.0 > - > > Key: KAFKA-10728 > URL: https://issues.apache.org/jira/browse/KAFKA-10728 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Reporter: Eazhilan Nagarajan >Priority: Major > > Hello, > > I use MirrorMaker 2.0 to copy data across two Kafka clusters and it's all > working fine. Recently we enabled compressing while producing data into any > topic which had a very positive impact on the storage and other resources but > while mirroring, the data seems to be decompressed at the target Kafka > cluster. I tried enabling compression using the below config in MM2, the data > at the target cluster is compressed now, the decompress and re-compress > continues to happen and it eats up a lot of resources unnecessarily. > > {noformat} > - alias: my-passive-cluster > authentication: > passwordSecret: > password: password > secretName: passive-cluster-secret > type: scram-sha-512 > username: user-1 > bootstrapServers: my-passive-cluster.com:443 > config: > config.storage.replication.factor: 3 > offset.storage.replication.factor: 3 > status.storage.replication.factor: 3 > producer.compression.type: gzip{noformat} > I found couple of Jira issues talking about it but I don't know if the > shallow iterator option is available now. > https://issues.apache.org/jira/browse/KAFKA-732, > https://issues.apache.org/jira/browse/KAFKA-845 > > Kindly let me if this is currently available or if it'll be available in the > future. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe commented on a change in pull request #10043: MINOR: Add StorageTool as specified in KIP-631
cmccabe commented on a change in pull request #10043: URL: https://github.com/apache/kafka/pull/10043#discussion_r572340541 ## File path: core/src/main/scala/kafka/tools/StorageTool.scala ## @@ -0,0 +1,232 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.tools + +import java.io.PrintStream +import java.nio.file.{Files, Paths} + +import kafka.server.{BrokerMetadataCheckpoint, KafkaConfig, MetaProperties, RawMetaProperties} +import kafka.utils.{Exit, Logging} +import net.sourceforge.argparse4j.ArgumentParsers +import net.sourceforge.argparse4j.impl.Arguments.{store, storeTrue} +import org.apache.kafka.common.Uuid +import org.apache.kafka.common.utils.Utils + +import scala.collection.mutable + +object StorageTool extends Logging { + def main(args: Array[String]): Unit = { +try { + val parser = ArgumentParsers. +newArgumentParser("kafka-storage"). +defaultHelp(true). +description("The Kafka storage tool.") + val subparsers = parser.addSubparsers().dest("command") + + val infoParser = subparsers.addParser("info"). +help("Get information about the Kafka log directories on this node.") + val formatParser = subparsers.addParser("format"). +help("Format the Kafka log directories on this node.") + subparsers.addParser("random-uuid").help("Print a random UUID.") + List(infoParser, formatParser).foreach(parser => { +parser.addArgument("--config", "-c"). + action(store()). + required(true). + help("The Kafka configuration file to use.") + }) + formatParser.addArgument("--cluster-id", "-t"). +action(store()). +required(true). +help("The cluster ID to use.") + formatParser.addArgument("--ignore-formatted", "-g"). +action(storeTrue()) + + val namespace = parser.parseArgsOrFail(args) + val command = namespace.getString("command") + val config = Option(namespace.getString("config")).flatMap( +p => Some(new KafkaConfig(Utils.loadProps(p + + command match { +case "info" => + val directories = configToLogDirectories(config.get) + val kip500Mode = configToKip500Mode(config.get) + Exit.exit(infoCommand(System.out, kip500Mode, directories)) + +case "format" => + val directories = configToLogDirectories(config.get) + val clusterId = namespace.getString("cluster_id") + val metaProperties = buildMetadataProperties(clusterId, config.get) + val ignoreFormatted = namespace.getBoolean("ignore_formatted") + if (!configToKip500Mode(config.get)) { +throw new TerseFailure("The kafka configuration file appears to be for " + + "a legacy cluster. Formatting is only supported for kip-500 clusters.") + } + Exit.exit(formatCommand(System.out, directories, metaProperties, ignoreFormatted )) + +case "random-uuid" => + System.out.println(Uuid.randomUuid) + Exit.exit(0) + +case _ => + throw new RuntimeException(s"Unknown command $command") + } +} catch { + case e: TerseFailure => +System.err.println(e.getMessage) +System.exit(1) +} + } + + def configToLogDirectories(config: KafkaConfig): Seq[String] = { +val directories = new mutable.TreeSet[String] +directories ++= config.logDirs +Option(config.metadataLogDir).foreach(directories.add) +directories.toSeq + } + + def configToKip500Mode(config: KafkaConfig): Boolean = config.processRoles.nonEmpty + + def infoCommand(stream: PrintStream, kip500Mode: Boolean, directories: Seq[String]): Int = { +val problems = new mutable.ArrayBuffer[String] +val foundDirectories = new mutable.ArrayBuffer[String] +var prevMetadata: Option[RawMetaProperties] = None +directories.sorted.foreach(directory => { + val directoryPath = Paths.get(directory) + if (!Files.isDirectory(directoryPath)) { +if (!Files.exists(directoryPath)) { + problems += s"$directoryPath does not exist" +} else { + problems += s"$directoryPath is not a directory" +} + }
[GitHub] [kafka] cmccabe commented on a change in pull request #10043: MINOR: Add StorageTool as specified in KIP-631
cmccabe commented on a change in pull request #10043: URL: https://github.com/apache/kafka/pull/10043#discussion_r572340025 ## File path: core/src/main/scala/kafka/tools/StorageTool.scala ## @@ -0,0 +1,232 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.tools + +import java.io.PrintStream +import java.nio.file.{Files, Paths} + +import kafka.server.{BrokerMetadataCheckpoint, KafkaConfig, MetaProperties, RawMetaProperties} +import kafka.utils.{Exit, Logging} +import net.sourceforge.argparse4j.ArgumentParsers +import net.sourceforge.argparse4j.impl.Arguments.{store, storeTrue} +import org.apache.kafka.common.Uuid +import org.apache.kafka.common.utils.Utils + +import scala.collection.mutable + +object StorageTool extends Logging { + def main(args: Array[String]): Unit = { +try { + val parser = ArgumentParsers. +newArgumentParser("kafka-storage"). +defaultHelp(true). +description("The Kafka storage tool.") + val subparsers = parser.addSubparsers().dest("command") + + val infoParser = subparsers.addParser("info"). +help("Get information about the Kafka log directories on this node.") + val formatParser = subparsers.addParser("format"). +help("Format the Kafka log directories on this node.") + subparsers.addParser("random-uuid").help("Print a random UUID.") + List(infoParser, formatParser).foreach(parser => { +parser.addArgument("--config", "-c"). + action(store()). + required(true). + help("The Kafka configuration file to use.") + }) + formatParser.addArgument("--cluster-id", "-t"). +action(store()). +required(true). +help("The cluster ID to use.") + formatParser.addArgument("--ignore-formatted", "-g"). +action(storeTrue()) + + val namespace = parser.parseArgsOrFail(args) + val command = namespace.getString("command") + val config = Option(namespace.getString("config")).flatMap( +p => Some(new KafkaConfig(Utils.loadProps(p + + command match { +case "info" => + val directories = configToLogDirectories(config.get) + val kip500Mode = configToKip500Mode(config.get) + Exit.exit(infoCommand(System.out, kip500Mode, directories)) + +case "format" => + val directories = configToLogDirectories(config.get) + val clusterId = namespace.getString("cluster_id") + val metaProperties = buildMetadataProperties(clusterId, config.get) + val ignoreFormatted = namespace.getBoolean("ignore_formatted") + if (!configToKip500Mode(config.get)) { +throw new TerseFailure("The kafka configuration file appears to be for " + + "a legacy cluster. Formatting is only supported for kip-500 clusters.") + } + Exit.exit(formatCommand(System.out, directories, metaProperties, ignoreFormatted )) + +case "random-uuid" => + System.out.println(Uuid.randomUuid) + Exit.exit(0) + +case _ => + throw new RuntimeException(s"Unknown command $command") + } +} catch { + case e: TerseFailure => +System.err.println(e.getMessage) +System.exit(1) +} + } + + def configToLogDirectories(config: KafkaConfig): Seq[String] = { +val directories = new mutable.TreeSet[String] +directories ++= config.logDirs +Option(config.metadataLogDir).foreach(directories.add) +directories.toSeq + } + + def configToKip500Mode(config: KafkaConfig): Boolean = config.processRoles.nonEmpty + + def infoCommand(stream: PrintStream, kip500Mode: Boolean, directories: Seq[String]): Int = { +val problems = new mutable.ArrayBuffer[String] +val foundDirectories = new mutable.ArrayBuffer[String] +var prevMetadata: Option[RawMetaProperties] = None +directories.sorted.foreach(directory => { + val directoryPath = Paths.get(directory) + if (!Files.isDirectory(directoryPath)) { +if (!Files.exists(directoryPath)) { + problems += s"$directoryPath does not exist" +} else { + problems += s"$directoryPath is not a directory" +} + }
[GitHub] [kafka] cmccabe commented on a change in pull request #10043: MINOR: Add StorageTool as specified in KIP-631
cmccabe commented on a change in pull request #10043: URL: https://github.com/apache/kafka/pull/10043#discussion_r572338881 ## File path: core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala ## @@ -88,9 +88,9 @@ class RawMetaProperties(val props: Properties = new Properties()) { } override def toString: String = { -"RawMetaProperties(" + props.keySet().asScala.toList.asInstanceOf[List[String]].sorted.map { +"{" + props.keySet().asScala.toList.asInstanceOf[List[String]].sorted.map { Review comment: It doesn't produce JSON (for example, keys and values are not quoted). This is intended for human consumption. It will show up in the command-line output. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #10047: MINOR: Add ClusterTool as specified in KIP-631
cmccabe commented on pull request #10047: URL: https://github.com/apache/kafka/pull/10047#issuecomment-775412214 Committed to 2.8 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe merged pull request #10047: MINOR: Add ClusterTool as specified in KIP-631
cmccabe merged pull request #10047: URL: https://github.com/apache/kafka/pull/10047 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request
junrao commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r568140154 ## File path: core/src/main/scala/kafka/server/FetchSession.scala ## @@ -37,7 +38,10 @@ object FetchSession { type REQ_MAP = util.Map[TopicPartition, FetchRequest.PartitionData] type RESP_MAP = util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]] type CACHE_MAP = ImplicitLinkedHashCollection[CachedPartition] + type UNRESOLVED_CACHE = util.HashSet[CachedUnresolvedPartition] type RESP_MAP_ITER = util.Iterator[util.Map.Entry[TopicPartition, FetchResponse.PartitionData[Records]]] + type TOPIC_ID_MAP = util.Map[String,Uuid] Review comment: space after comma ## File path: core/src/main/scala/kafka/server/FetchSession.scala ## @@ -187,24 +205,86 @@ class CachedPartition(val topic: String, } } +/** + * Very similar to CachedPartition above, CachedUnresolvedPartition is used for incremental fetch requests. + * These objects store partitions that had topic IDs that could not be resolved by the broker. + * + * Upon each incremental request in the session, these partitions will be loaded. They can either be removed + * through resolving the partition with the broker's topicNames map or by receiving an unresolved toForget ID. + * + * Since these partitions signify an error, they will always be returned in the response. + */ + +class CachedUnresolvedPartition(val topicId: Uuid, + val partition: Int, + var maxBytes: Int, + var fetchOffset: Long, + var leaderEpoch: Optional[Integer], + var fetcherLogStartOffset: Long, + var lastFetchedEpoch: Optional[Integer]) { + + def this(id: Uuid, partition: Int) = +this(id, partition, -1, -1, Optional.empty(), -1, Optional.empty[Integer]) + + def this(id: Uuid, partition: Int, reqData: FetchRequest.PartitionData) = +this(id, partition, reqData.maxBytes, reqData.fetchOffset, + reqData.currentLeaderEpoch, reqData.logStartOffset, reqData.lastFetchedEpoch) + + def reqData = new FetchRequest.PartitionData(fetchOffset, fetcherLogStartOffset, maxBytes, leaderEpoch, lastFetchedEpoch) + + def updateRequestParams(reqData: FetchRequest.PartitionData): Unit = { +// Update our cached request parameters. +maxBytes = reqData.maxBytes +fetchOffset = reqData.fetchOffset +fetcherLogStartOffset = reqData.logStartOffset +leaderEpoch = reqData.currentLeaderEpoch +lastFetchedEpoch = reqData.lastFetchedEpoch + } + + override def hashCode: Int = (31 * partition) + topicId.hashCode + + def canEqual(that: Any) = that.isInstanceOf[CachedUnresolvedPartition] + + override def equals(that: Any): Boolean = +that match { + case that: CachedUnresolvedPartition => +this.eq(that) || + (that.canEqual(this) && +this.partition.equals(that.partition) && +this.topicId.equals(that.topicId)) + case _ => false +} + + override def toString: String = synchronized { +"CachedPartition(Id=" + topicId + + ", partition=" + partition + + ", maxBytes=" + maxBytes + + ", fetchOffset=" + fetchOffset + + ", fetcherLogStartOffset=" + fetcherLogStartOffset + + ")" + } +} + /** * The fetch session. * * Each fetch session is protected by its own lock, which must be taken before mutable * fields are read or modified. This includes modification of the session partition map. * - * @param id The unique fetch session ID. - * @param privileged True if this session is privileged. Sessions crated by followers - * are privileged; sesssion created by consumers are not. - * @param partitionMap The CachedPartitionMap. - * @param creationMs The time in milliseconds when this session was created. - * @param lastUsedMs The last used time in milliseconds. This should only be updated by - * FetchSessionCache#touch. - * @param epochThe fetch session sequence number. + * @param id The unique fetch session ID. + * @param privileged True if this session is privileged. Sessions crated by followers + * are privileged; sesssion created by consumers are not. + * @param partitionMap The CachedPartitionMap. + * @param unresolvedPartitions The CachedUnresolvedPartitionMap + * @param creationMs The time in milliseconds when this session was created. + * @param lastUsedMs The last used time in milliseconds. This should only be updated by + * FetchSessionCache#touch. + * @param epoch The fetch session sequence number. */ class FetchSession(val id: Int, val privileged: Boolean, val partitionMap:
[GitHub] [kafka] mumrah commented on a change in pull request #9986: JUnit extensions for integration tests
mumrah commented on a change in pull request #9986: URL: https://github.com/apache/kafka/pull/9986#discussion_r572310749 ## File path: core/src/test/java/kafka/test/ClusterInstance.java ## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.test; + +import kafka.network.SocketServer; +import kafka.test.annotation.ClusterTest; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.common.network.ListenerName; + +import java.util.Collection; +import java.util.Optional; +import java.util.Properties; + +public interface ClusterInstance { + +enum ClusterType { +ZK, +// Raft +} + +/** + * Cluster type. For now, only ZK is supported. + */ +ClusterType clusterType(); + +/** + * The cluster configuration used to create this cluster. Changing data in this instance through this accessor will + * have no affect on the cluster since it is already provisioned. + */ +ClusterConfig config(); + +/** + * The listener for this cluster as configured by {@link ClusterTest} or by {@link ClusterConfig}. If + * unspecified by those sources, this will return the listener for the default security protocol PLAINTEXT + */ +ListenerName listener(); + +/** + * The broker connect string which can be used by clients for bootstrapping + */ +String brokerList(); + +/** + * A collection of all brokers in the cluster. In ZK-based clusters this will also include the broker which is + * acting as the controller (since ZK controllers serve both broker and controller roles). + */ +Collection brokerSocketServers(); + +/** + * A collection of all controllers in the cluster. For ZK-based clusters, this will return the broker which is also + * currently the active controller. For Raft-based clusters, this will return all controller servers. + */ +Collection controllerSocketServers(); + +/** + * Any one of the broker servers. + */ +Optional anyBrokerSocketServer(); Review comment: Yea we can ditch the Optional for these This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #9986: JUnit extensions for integration tests
cmccabe commented on a change in pull request #9986: URL: https://github.com/apache/kafka/pull/9986#discussion_r572310383 ## File path: core/src/test/scala/integration/kafka/server/IntegrationTestHelper.scala ## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package integration.kafka.server + +import kafka.network.SocketServer +import kafka.utils.{NotNothing, TestUtils} +import org.apache.kafka.common.network.{ListenerName, Mode} +import org.apache.kafka.common.protocol.ApiKeys +import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, RequestHeader, RequestTestUtils, ResponseHeader} +import org.apache.kafka.common.security.auth.SecurityProtocol +import org.apache.kafka.common.utils.Utils + +import java.io.{DataInputStream, DataOutputStream} +import java.net.Socket +import java.nio.ByteBuffer +import java.util.Properties +import scala.annotation.nowarn +import scala.reflect.ClassTag + +class IntegrationTestHelper { Review comment: These are all static methods, right? Can we just make this an "object" rather than a class and have something like `object IntegrationTestUtils` ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #9986: JUnit extensions for integration tests
cmccabe commented on a change in pull request #9986: URL: https://github.com/apache/kafka/pull/9986#discussion_r572309452 ## File path: core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java ## @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.test.junit; + +import integration.kafka.server.IntegrationTestHelper; +import kafka.api.IntegrationTestHarness; +import kafka.network.SocketServer; +import kafka.server.KafkaServer; +import kafka.utils.TestUtils; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import kafka.test.ClusterConfig; +import kafka.test.ClusterInstance; +import org.junit.jupiter.api.extension.AfterTestExecutionCallback; +import org.junit.jupiter.api.extension.BeforeTestExecutionCallback; +import org.junit.jupiter.api.extension.Extension; +import org.junit.jupiter.api.extension.TestTemplateInvocationContext; +import scala.Option; +import scala.collection.JavaConverters; +import scala.compat.java8.OptionConverters; + +import java.io.File; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +/** + * Wraps a {@link IntegrationTestHarness} inside lifecycle methods for a test invocation. Each instance of this + * class is provided with a configuration for the cluster. + * + * This context also provides parameter resolvers for: + * + * + * ClusterConfig (the same instance passed to the constructor) + * ClusterInstance (includes methods to expose underlying SocketServer-s) + * IntegrationTestHelper (helper methods) + * + */ +public class ZkClusterInvocationContext implements TestTemplateInvocationContext { + +private final ClusterConfig clusterConfig; +private final AtomicReference clusterReference; + +public ZkClusterInvocationContext(ClusterConfig clusterConfig) { +this.clusterConfig = clusterConfig; +this.clusterReference = new AtomicReference<>(); +} + +@Override +public String getDisplayName(int invocationIndex) { +String clusterDesc = clusterConfig.nameTags().entrySet().stream() +.map(Object::toString) +.collect(Collectors.joining(", ")); +return String.format("[Zk %d] %s", invocationIndex, clusterDesc); +} + +@Override +public List getAdditionalExtensions() { +ClusterInstance clusterShim = new ZkClusterInstance(clusterConfig, clusterReference); +return Arrays.asList( +(BeforeTestExecutionCallback) context -> { +// We have to wait to actually create the underlying cluster until after our @BeforeEach methods +// have run. This allows tests to set up external dependencies like ZK, MiniKDC, etc. +// However, since we cannot create this instance until we are inside the test invocation, we have +// to use a container class (AtomicReference) to provide this cluster object to the test itself + +// This is what tests normally extend from to start a cluster, here we create it anonymously and +// configure the cluster using values from ClusterConfig +IntegrationTestHarness cluster = new IntegrationTestHarness() { +@Override +public Properties serverConfig() { +return clusterConfig.serverProperties(); +} + +@Override +public Properties adminClientConfig() { +return clusterConfig.adminClientProperties(); +} + +@Override +public Properties consumerConfig() { +return clusterConfig.consumerProperties(); +} + +@Override +public Properties producerConfig() { +return
[GitHub] [kafka] d8tltanc commented on pull request #9865: Fill in the 2.8 release note for Authorizer
d8tltanc commented on pull request #9865: URL: https://github.com/apache/kafka/pull/9865#issuecomment-775382411 Hi @rajinisivaram . Thanks for the comments. I've addressed all of them. Could you take a look and approve it? Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #9986: JUnit extensions for integration tests
cmccabe commented on a change in pull request #9986: URL: https://github.com/apache/kafka/pull/9986#discussion_r572291568 ## File path: core/src/test/java/kafka/test/annotation/AutoStart.java ## @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.test.annotation; + +public enum AutoStart { Review comment: capitalize enum names please. also, do we need this enum? Why does DEFAULT need to be distinct from YES? Once we have a lot of tests depending on the default being "on" I can't see us changing this (or how having 3 values would help us change it if so...) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #9986: JUnit extensions for integration tests
cmccabe commented on a change in pull request #9986: URL: https://github.com/apache/kafka/pull/9986#discussion_r572290904 ## File path: core/src/test/java/kafka/test/ClusterInstance.java ## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.test; + +import kafka.network.SocketServer; +import kafka.test.annotation.ClusterTest; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.common.network.ListenerName; + +import java.util.Collection; +import java.util.Optional; +import java.util.Properties; + +public interface ClusterInstance { + +enum ClusterType { +Zk, +// Raft +} + +/** + * Cluster type. For now, only ZK is supported. + */ +ClusterType clusterType(); + +/** + * The cluster configuration used to create this cluster. Changing data in this instance through this accessor will + * have no affect on the cluster since it is already provisioned. + */ +ClusterConfig config(); + +/** + * The listener for this cluster as configured by {@link ClusterTest} or by {@link ClusterConfig}. If + * unspecified by those sources, this will return the listener for the default security protocol PLAINTEXT + */ +ListenerName listener(); + +/** + * The broker connect string which can be used by clients for bootstrapping + */ +String brokerList(); + +/** + * A collection of all brokers in the cluster. In ZK-based clusters this will also include the broker which is + * acting as the controller (since ZK controllers serve both broker and controller roles). + */ +Collection brokers(); + +/** + * A collection of all controllers in the cluster. For ZK-based clusters, this will return the broker which is also + * currently the active controller. For Raft-based clusters, this will return all controller servers. + */ +Collection controllers(); + +/** + * Any one of the broker servers. + */ +Optional anyBroker(); + +/** + * Any one of the controller servers. + */ +Optional anyController(); + +/** + * The underlying object which is responsible for setting up and tearing down the cluster. + */ +Object getUnderlying(); + +default T getUnderlying(Class asClass) { +return asClass.cast(getUnderlying()); +} + +Admin createAdminClient(Properties configOverrides); + +default Admin createAdminClient() { +return createAdminClient(new Properties()); +} + +void start(); + +void stop(); Review comment: ok This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #9986: JUnit extensions for integration tests
cmccabe commented on a change in pull request #9986: URL: https://github.com/apache/kafka/pull/9986#discussion_r572290383 ## File path: core/src/test/java/kafka/test/ClusterInstance.java ## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.test; + +import kafka.network.SocketServer; +import kafka.test.annotation.ClusterTest; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.common.network.ListenerName; + +import java.util.Collection; +import java.util.Optional; +import java.util.Properties; + +public interface ClusterInstance { + +enum ClusterType { +ZK, +// Raft +} + +/** + * Cluster type. For now, only ZK is supported. + */ +ClusterType clusterType(); + +/** + * The cluster configuration used to create this cluster. Changing data in this instance through this accessor will + * have no affect on the cluster since it is already provisioned. + */ +ClusterConfig config(); + +/** + * The listener for this cluster as configured by {@link ClusterTest} or by {@link ClusterConfig}. If + * unspecified by those sources, this will return the listener for the default security protocol PLAINTEXT + */ +ListenerName listener(); + +/** + * The broker connect string which can be used by clients for bootstrapping + */ +String brokerList(); + +/** + * A collection of all brokers in the cluster. In ZK-based clusters this will also include the broker which is + * acting as the controller (since ZK controllers serve both broker and controller roles). + */ +Collection brokerSocketServers(); + +/** + * A collection of all controllers in the cluster. For ZK-based clusters, this will return the broker which is also + * currently the active controller. For Raft-based clusters, this will return all controller servers. + */ +Collection controllerSocketServers(); + +/** + * Any one of the broker servers. + */ +Optional anyBrokerSocketServer(); + +/** + * Any one of the controller servers. + */ +Optional anyControllerSocketServer(); Review comment: Same question... can we just throw an exception if there are no controller socket servers? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #9986: JUnit extensions for integration tests
cmccabe commented on a change in pull request #9986: URL: https://github.com/apache/kafka/pull/9986#discussion_r572290152 ## File path: core/src/test/java/kafka/test/ClusterInstance.java ## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.test; + +import kafka.network.SocketServer; +import kafka.test.annotation.ClusterTest; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.common.network.ListenerName; + +import java.util.Collection; +import java.util.Optional; +import java.util.Properties; + +public interface ClusterInstance { + +enum ClusterType { +ZK, +// Raft +} + +/** + * Cluster type. For now, only ZK is supported. + */ +ClusterType clusterType(); + +/** + * The cluster configuration used to create this cluster. Changing data in this instance through this accessor will + * have no affect on the cluster since it is already provisioned. + */ +ClusterConfig config(); + +/** + * The listener for this cluster as configured by {@link ClusterTest} or by {@link ClusterConfig}. If + * unspecified by those sources, this will return the listener for the default security protocol PLAINTEXT + */ +ListenerName listener(); + +/** + * The broker connect string which can be used by clients for bootstrapping + */ +String brokerList(); + +/** + * A collection of all brokers in the cluster. In ZK-based clusters this will also include the broker which is + * acting as the controller (since ZK controllers serve both broker and controller roles). + */ +Collection brokerSocketServers(); + +/** + * A collection of all controllers in the cluster. For ZK-based clusters, this will return the broker which is also + * currently the active controller. For Raft-based clusters, this will return all controller servers. + */ +Collection controllerSocketServers(); + +/** + * Any one of the broker servers. + */ +Optional anyBrokerSocketServer(); Review comment: Why does this return an `Optional<>`? Zero-node clusters seem like a corner case we don't really care about (we can just throw an exception there) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #9986: JUnit extensions for integration tests
cmccabe commented on a change in pull request #9986: URL: https://github.com/apache/kafka/pull/9986#discussion_r572289080 ## File path: core/src/test/java/kafka/test/ClusterInstance.java ## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.test; + +import kafka.network.SocketServer; +import kafka.test.annotation.ClusterTest; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.common.network.ListenerName; + +import java.util.Collection; +import java.util.Optional; +import java.util.Properties; + +public interface ClusterInstance { + +enum ClusterType { +ZK, +// Raft +} + +/** + * Cluster type. For now, only ZK is supported. + */ +ClusterType clusterType(); + +/** + * The cluster configuration used to create this cluster. Changing data in this instance through this accessor will + * have no affect on the cluster since it is already provisioned. + */ +ClusterConfig config(); + +/** + * The listener for this cluster as configured by {@link ClusterTest} or by {@link ClusterConfig}. If + * unspecified by those sources, this will return the listener for the default security protocol PLAINTEXT + */ +ListenerName listener(); + +/** + * The broker connect string which can be used by clients for bootstrapping + */ +String brokerList(); Review comment: I realize `brokerList` is the existing name, but given that we've standardized on `--bootstrap-server` for the command-line argument, maybe it makes sense to use that name here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #9986: JUnit extensions for integration tests
cmccabe commented on a change in pull request #9986: URL: https://github.com/apache/kafka/pull/9986#discussion_r572286453 ## File path: core/src/test/java/kafka/test/ClusterInstance.java ## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.test; + +import kafka.network.SocketServer; +import kafka.test.annotation.ClusterTest; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.common.network.ListenerName; + +import java.util.Collection; +import java.util.Optional; +import java.util.Properties; + +public interface ClusterInstance { + +enum ClusterType { +Zk, +// Raft +} + +/** + * Cluster type. For now, only ZK is supported. + */ +ClusterType clusterType(); + +/** + * The cluster configuration used to create this cluster. Changing data in this instance through this accessor will + * have no affect on the cluster since it is already provisioned. + */ +ClusterConfig config(); + +/** + * The listener for this cluster as configured by {@link ClusterTest} or by {@link ClusterConfig}. If + * unspecified by those sources, this will return the listener for the default security protocol PLAINTEXT + */ Review comment: How about naming these "clientSecurityProtocol", "clientListenerName", etc. to reflect the fact that the clients will be using these security protocols, listener names, etc. Then we can add more listeners later for specific tests without doing a big rename. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12305) Flatten SMT fails on arrays
[ https://issues.apache.org/jira/browse/KAFKA-12305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17281295#comment-17281295 ] Randall Hauch commented on KAFKA-12305: --- {quote} Do you have thoughts on whether elements inside arrays should themselves be flattened? {quote} I think it's arguable whether Flatten should flatten structs/maps *with* arrays. My initial thought is that it should not because the documentation does not mention arrays, and because it's not clear to me how often that case will come up. I'm definitely open to suggestions from others, though! > Flatten SMT fails on arrays > --- > > Key: KAFKA-12305 > URL: https://issues.apache.org/jira/browse/KAFKA-12305 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.0.1, 2.1.1, 2.2.2, 2.3.1, 2.4.1, 2.5.1, 2.7.0, 2.6.1, > 2.8.0 >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > > The {{Flatten}} SMT fails for array types. A sophisticated approach that > tries to flatten arrays might be desirable in some cases, and may have been > punted during the early design phase of the transform, but in the interim, > it's probably not worth it to make array data and the SMT mutually exclusive. > A naive approach that preserves arrays as-are and doesn't attempt to flatten > them seems fair for now, but one alternative could be to traverse array > elements and, if any are maps or structs, flatten those as well. > Adding behavior to fully flatten arrays by essentially transforming them into > maps whose elements are the elements of the array and whose keys are the > indices of each element is likely out of scope for a bug fix and, although > useful, might have to wait for a KIP. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe commented on a change in pull request #9986: JUnit extensions for integration tests
cmccabe commented on a change in pull request #9986: URL: https://github.com/apache/kafka/pull/9986#discussion_r572284766 ## File path: core/src/test/java/kafka/test/ClusterConfig.java ## @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.test; + +import kafka.test.annotation.Type; +import org.apache.kafka.common.security.auth.SecurityProtocol; + +import java.io.File; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; + +/** + * Represents a requested configuration of a Kafka cluster for integration testing + */ +public class ClusterConfig { + +private final Type type; +private final int brokers; +private final int controllers; +private final String name; +private final boolean autoStart; + +private final String securityProtocol; +private final String listenerName; +private final File trustStoreFile; + +private final Properties serverProperties = new Properties(); +private final Properties producerProperties = new Properties(); +private final Properties consumerProperties = new Properties(); +private final Properties adminClientProperties = new Properties(); +private final Properties saslServerProperties = new Properties(); +private final Properties saslClientProperties = new Properties(); + +ClusterConfig(Type type, int brokers, int controllers, String name, boolean autoStart, + String securityProtocol, String listenerName, File trustStoreFile) { +this.type = type; +this.brokers = brokers; +this.controllers = controllers; +this.name = name; +this.autoStart = autoStart; +this.securityProtocol = securityProtocol; +this.listenerName = listenerName; +this.trustStoreFile = trustStoreFile; +} + +public Type clusterType() { +return type; +} + +public int brokers() { +return brokers; +} + +public int controllers() { +return controllers; Review comment: What does this return in ZK mode? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #9986: JUnit extensions for integration tests
cmccabe commented on a change in pull request #9986: URL: https://github.com/apache/kafka/pull/9986#discussion_r572284631 ## File path: core/src/test/java/kafka/test/ClusterConfig.java ## @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.test; + +import kafka.test.annotation.Type; +import org.apache.kafka.common.security.auth.SecurityProtocol; + +import java.io.File; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; + +/** + * Represents a requested configuration of a Kafka cluster for integration testing + */ +public class ClusterConfig { + +private final Type type; +private final int brokers; +private final int controllers; +private final String name; +private final boolean autoStart; + +private final String securityProtocol; +private final String listenerName; +private final File trustStoreFile; + +private final Properties serverProperties = new Properties(); +private final Properties producerProperties = new Properties(); +private final Properties consumerProperties = new Properties(); +private final Properties adminClientProperties = new Properties(); +private final Properties saslServerProperties = new Properties(); +private final Properties saslClientProperties = new Properties(); + +ClusterConfig(Type type, int brokers, int controllers, String name, boolean autoStart, + String securityProtocol, String listenerName, File trustStoreFile) { +this.type = type; +this.brokers = brokers; +this.controllers = controllers; +this.name = name; +this.autoStart = autoStart; +this.securityProtocol = securityProtocol; +this.listenerName = listenerName; +this.trustStoreFile = trustStoreFile; +} + +public Type clusterType() { +return type; +} + +public int brokers() { Review comment: let's use "numBrokers" , etc. to reflect the fact that we're not returning the actual brokers. Same for "controllers" This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #9986: JUnit extensions for integration tests
cmccabe commented on a change in pull request #9986: URL: https://github.com/apache/kafka/pull/9986#discussion_r572284053 ## File path: core/src/test/java/kafka/test/ClusterConfig.java ## @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.test; + +import kafka.test.annotation.Type; +import org.apache.kafka.common.security.auth.SecurityProtocol; + +import java.io.File; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; + +/** + * Represents a requested configuration of a Kafka cluster for integration testing + */ +public class ClusterConfig { + +private final Type type; +private final int brokers; +private final int controllers; +private final String name; +private final boolean autoStart; + +private final String securityProtocol; Review comment: If there's no reason to use a string then let's just use the `SecurityProtocol` enum. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org