[GitHub] [kafka] chia7712 commented on a change in pull request #10084: MINOR: Rename DecommissionBrokers to UnregisterBrokers

2021-02-08 Thread GitBox


chia7712 commented on a change in pull request #10084:
URL: https://github.com/apache/kafka/pull/10084#discussion_r572633977



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -217,6 +217,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 case ApiKeys.ENVELOPE => handleEnvelope(request)
 case ApiKeys.DESCRIBE_CLUSTER => handleDescribeCluster(request)
 case ApiKeys.DESCRIBE_PRODUCERS => 
handleDescribeProducersRequest(request)
+case ApiKeys.UNREGISTER_BROKER => maybeForwardToController(request, 
handleUnregisterBrokerRequest)

Review comment:
   @cmccabe Thanks for explanation. It seems the 
[comment](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L221)
 gets invalid now.
   ```
   Handle requests that should have been sent to the KIP-500 controlle
   ```
   
   Could you revise the comment as well?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12315) Clearing the ZkReplicaStateMachine request batch state upon ControllerMovedException

2021-02-08 Thread Lucas Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang updated KAFKA-12315:
---
Description: 
As shown in the attached sequence diagram, during topic deletion the following 
sequence of events can happen
 1. The ZkReplicaStateMachine calls 
AbstractControllerBrokerRequestBatch.addStopReplicaRequestForBrokers and
 adds some entries to its stopReplicaRequestMap

2. The ZkReplicaStateMachine then tries to call KafkaZkClient.updateLeaderAndIsr

3. Inside KafkaZkClient$.unwrapResponseWithControllerEpochCheck, a 
ControllerMovedException may be thrown
 due to zkVersion check failure

4. The ControllerMovedException is captured by the ZkPartitionStateMachine and 
an error such as the following is created:
{code:java}
2021/02/05 04:34:45.302 [ZkReplicaStateMachine] [ReplicaStateMachine 
controllerId=31862] Controller moved to another broker when moving some 
replicas to OfflineReplica state
 org.apache.kafka.common.errors.ControllerMovedException: Controller epoch 
zkVersion check fails. Expected zkVersion = 139{code}
5. The ControllerMovedException is rethrown and captured by the 
KafkaController, which will resign

At this point, the controller has resigned, however the stopReplicaRequestMap 
state populated in step 1 hasn't been cleared.

Later on, when the controller wins an election and becomes the active 
controller again, an IllegalStateException will be triggered due to the left 
over state:

 
{code:java}
2021/02/05 16:04:33.193 [ZkReplicaStateMachine] [ReplicaStateMachine 
controllerId=31862] Error while moving some replicas to OnlineReplica state
 java.lang.IllegalStateException: Controller to broker state change requests 
batch is not empty while creating a new one. Some StopReplica state changes 
Map(6121 -> ListB\
 uffer(StopRepl\
 icaRequestInfo([Topic=,Partition=2,Replica=6121],false))) might be lost
 at 
kafka.controller.AbstractControllerBrokerRequestBatch.newBatch(ControllerChannelManager.scala:383)
 ~[kafka_2.12-2.4.1.10.jar:?]
 at 
kafka.controller.ZkReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:109)
 ~[kafka_2.12-2.4.1.10.jar:?]
 at kafka.controller.ReplicaStateMachine.startup(ReplicaStateMachine.scala:40) 
~[kafka_2.12-2.4.1.10.jar:?]
 at 
kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:365)
 ~[kafka_2.12-2.4.1.10.jar:?]
 at kafka.controller.KafkaController.elect(KafkaController.scala:1484) 
~[kafka_2.12-2.4.1.10.jar:?]
 at kafka.controller.KafkaController.processReelect(KafkaController.scala:1972) 
~[kafka_2.12-2.4.1.10.jar:?]
 at kafka.controller.KafkaController.process(KafkaController.scala:2065) 
~[kafka_2.12-2.4.1.10.jar:?]
 at kafka.controller.QueuedEvent.process(ControllerEventManager.scala:53) 
~[kafka_2.12-2.4.1.10.jar:?]
 at 
kafka.controller.ControllerEventManager$ControllerEventThread.$anonfun$doWork$1(ControllerEventManager.scala:137)
 ~[kafka_2.12-2.4.1.10.jar:?]
 at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) 
[scala-library-2.12.10.jar:?]
 at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31) 
[kafka_2.12-2.4.1.10.jar:?]
 at 
kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:137)
 [kafka_2.12-2.4.1.10.jar:?]
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) 
[kafka_2.12-2.4.1.10.jar:?]{code}
Essentially, the controller is not able to transition some replicas to 
OnlineReplica state, and it cannot send any requests to any brokers via the 
ReplicaStateMachine.

  was:
As shown in the attached sequence diagram, during topic deletion the following 
sequence of events can happen
 1. The ZkReplicaStateMachine calls 
AbstractControllerBrokerRequestBatch.addStopReplicaRequestForBrokers and
 adds some entries to its stopReplicaRequestMap

2. The ZkReplicaStateMachine then tries to call KafkaZkClient.updateLeaderAndIsr

3. Inside KafkaZkClient$.unwrapResponseWithControllerEpochCheck, a 
ControllerMovedException may be thrown
 due to zkVersion check failure

4. The ControllerMovedException is captured by the ZkPartitionStateMachine and 
an error such as the following is created:

2021/02/05 04:34:45.302 [ZkReplicaStateMachine] [ReplicaStateMachine 
controllerId=31862] Controller moved to another broker when moving some 
replicas to OfflineReplica state
 org.apache.kafka.common.errors.ControllerMovedException: Controller epoch 
zkVersion check fails. Expected zkVersion = 139

5. The ControllerMovedException is rethrown and captured by the 
KafkaController, which will resign

At this point, the controller has resigned, however the stopReplicaRequestMap 
state populated in step 1 hasn't been cleared.

Later on, when the controller wins an election and becomes the active 
controller again, an IllegalStateException will be triggered due to the left 
over state:

 
{code:java}
2021/02/05 16:04:33.193 [ZkReplicaStateMachine] 

[GitHub] [kafka] cmccabe commented on a change in pull request #10084: MINOR: Rename DecommissionBrokers to UnregisterBrokers

2021-02-08 Thread GitBox


cmccabe commented on a change in pull request #10084:
URL: https://github.com/apache/kafka/pull/10084#discussion_r572627716



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -217,6 +217,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 case ApiKeys.ENVELOPE => handleEnvelope(request)
 case ApiKeys.DESCRIBE_CLUSTER => handleDescribeCluster(request)
 case ApiKeys.DESCRIBE_PRODUCERS => 
handleDescribeProducersRequest(request)
+case ApiKeys.UNREGISTER_BROKER => maybeForwardToController(request, 
handleUnregisterBrokerRequest)

Review comment:
   This call needs to be forwarded to the KIP-500 controller.  That is 
different from the other KIP-500 RPCs which are controller RPCs, and are not 
expected to be received on the broker at all.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 opened a new pull request #10086: MINOR: expose number of forwarding requests to metrics

2021-02-08 Thread GitBox


chia7712 opened a new pull request #10086:
URL: https://github.com/apache/kafka/pull/10086


   it was an exposed metrics value (#9580 removed it). The number of requests 
in queue is an important metrics item (for example: queue in `SocketServer`) so 
we should expose it.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12315) Clearing the ZkReplicaStateMachine request batch state upon ControllerMovedException

2021-02-08 Thread Lucas Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang updated KAFKA-12315:
---
Description: 
As shown in the attached sequence diagram, during topic deletion the following 
sequence of events can happen
 1. The ZkReplicaStateMachine calls 
AbstractControllerBrokerRequestBatch.addStopReplicaRequestForBrokers and
 adds some entries to its stopReplicaRequestMap

2. The ZkReplicaStateMachine then tries to call KafkaZkClient.updateLeaderAndIsr

3. Inside KafkaZkClient$.unwrapResponseWithControllerEpochCheck, a 
ControllerMovedException may be thrown
 due to zkVersion check failure

4. The ControllerMovedException is captured by the ZkPartitionStateMachine and 
an error such as the following is created:

2021/02/05 04:34:45.302 [ZkReplicaStateMachine] [ReplicaStateMachine 
controllerId=31862] Controller moved to another broker when moving some 
replicas to OfflineReplica state
 org.apache.kafka.common.errors.ControllerMovedException: Controller epoch 
zkVersion check fails. Expected zkVersion = 139

5. The ControllerMovedException is rethrown and captured by the 
KafkaController, which will resign

At this point, the controller has resigned, however the stopReplicaRequestMap 
state populated in step 1 hasn't been cleared.

Later on, when the controller wins an election and becomes the active 
controller again, an IllegalStateException will be triggered due to the left 
over state:

 
{code:java}
2021/02/05 16:04:33.193 [ZkReplicaStateMachine] [ReplicaStateMachine 
controllerId=31862] Error while moving some replicas to OnlineReplica state
 java.lang.IllegalStateException: Controller to broker state change requests 
batch is not empty while creating a new one. Some StopReplica state changes 
Map(6121 -> ListB\
 uffer(StopRepl\
 icaRequestInfo([Topic=,Partition=2,Replica=6121],false))) might be lost
 at 
kafka.controller.AbstractControllerBrokerRequestBatch.newBatch(ControllerChannelManager.scala:383)
 ~[kafka_2.12-2.4.1.10.jar:?]
 at 
kafka.controller.ZkReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:109)
 ~[kafka_2.12-2.4.1.10.jar:?]
 at kafka.controller.ReplicaStateMachine.startup(ReplicaStateMachine.scala:40) 
~[kafka_2.12-2.4.1.10.jar:?]
 at 
kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:365)
 ~[kafka_2.12-2.4.1.10.jar:?]
 at kafka.controller.KafkaController.elect(KafkaController.scala:1484) 
~[kafka_2.12-2.4.1.10.jar:?]
 at kafka.controller.KafkaController.processReelect(KafkaController.scala:1972) 
~[kafka_2.12-2.4.1.10.jar:?]
 at kafka.controller.KafkaController.process(KafkaController.scala:2065) 
~[kafka_2.12-2.4.1.10.jar:?]
 at kafka.controller.QueuedEvent.process(ControllerEventManager.scala:53) 
~[kafka_2.12-2.4.1.10.jar:?]
 at 
kafka.controller.ControllerEventManager$ControllerEventThread.$anonfun$doWork$1(ControllerEventManager.scala:137)
 ~[kafka_2.12-2.4.1.10.jar:?]
 at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) 
[scala-library-2.12.10.jar:?]
 at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31) 
[kafka_2.12-2.4.1.10.jar:?]
 at 
kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:137)
 [kafka_2.12-2.4.1.10.jar:?]
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) 
[kafka_2.12-2.4.1.10.jar:?]{code}


 Essentially, the controller is not able to transition some replicas to 
OnlineReplica state, and it cannot send any requests to any brokers via the 
ReplicaStateMachine.

  was:
As shown in the attached sequence diagram, during topic deletion the following 
sequence of events can happen
1. The ZkReplicaStateMachine calls 
AbstractControllerBrokerRequestBatch.addStopReplicaRequestForBrokers and
   adds some entries to its stopReplicaRequestMap

2. The ZkReplicaStateMachine then tries to call KafkaZkClient.updateLeaderAndIsr

3. Inside KafkaZkClient$.unwrapResponseWithControllerEpochCheck, a 
ControllerMovedException may be thrown
   due to zkVersion check failure

4. The ControllerMovedException is captured by the ZkPartitionStateMachine and 
an error such as the following is created:

2021/02/05 04:34:45.302 [ZkReplicaStateMachine] [ReplicaStateMachine 
controllerId=31862] Controller moved to another broker when moving some 
replicas to OfflineReplica state
org.apache.kafka.common.errors.ControllerMovedException: Controller epoch 
zkVersion check fails. Expected zkVersion = 139

5. The ControllerMovedException is rethrown and captured by the 
KafkaController, which will resign

At this point, the controller has resigned, however the stopReplicaRequestMap 
state populated in step 1 hasn't been cleared.

Later on, when the controller wins an election and becomes the active 
controller again, an IllegalStateException will be triggered due to the left 
over state:

```
2021/02/05 16:04:33.193 [ZkReplicaStateMachine] [ReplicaStateMachine 

[jira] [Created] (KAFKA-12315) Clearing the ZkReplicaStateMachine request batch state upon ControllerMovedException

2021-02-08 Thread Lucas Wang (Jira)
Lucas Wang created KAFKA-12315:
--

 Summary: Clearing the ZkReplicaStateMachine request batch state 
upon ControllerMovedException
 Key: KAFKA-12315
 URL: https://issues.apache.org/jira/browse/KAFKA-12315
 Project: Kafka
  Issue Type: Improvement
Reporter: Lucas Wang
 Attachments: controller_moved_left_over_state.png

As shown in the attached sequence diagram, during topic deletion the following 
sequence of events can happen
1. The ZkReplicaStateMachine calls 
AbstractControllerBrokerRequestBatch.addStopReplicaRequestForBrokers and
   adds some entries to its stopReplicaRequestMap

2. The ZkReplicaStateMachine then tries to call KafkaZkClient.updateLeaderAndIsr

3. Inside KafkaZkClient$.unwrapResponseWithControllerEpochCheck, a 
ControllerMovedException may be thrown
   due to zkVersion check failure

4. The ControllerMovedException is captured by the ZkPartitionStateMachine and 
an error such as the following is created:

2021/02/05 04:34:45.302 [ZkReplicaStateMachine] [ReplicaStateMachine 
controllerId=31862] Controller moved to another broker when moving some 
replicas to OfflineReplica state
org.apache.kafka.common.errors.ControllerMovedException: Controller epoch 
zkVersion check fails. Expected zkVersion = 139

5. The ControllerMovedException is rethrown and captured by the 
KafkaController, which will resign

At this point, the controller has resigned, however the stopReplicaRequestMap 
state populated in step 1 hasn't been cleared.

Later on, when the controller wins an election and becomes the active 
controller again, an IllegalStateException will be triggered due to the left 
over state:

```
2021/02/05 16:04:33.193 [ZkReplicaStateMachine] [ReplicaStateMachine 
controllerId=31862] Error while moving some replicas to OnlineReplica state
java.lang.IllegalStateException: Controller to broker state change requests 
batch is not empty while creating a new one. Some StopReplica state changes 
Map(6121 -> ListB\
uffer(StopRepl\
icaRequestInfo([Topic=,Partition=2,Replica=6121],false))) 
might be lost
at 
kafka.controller.AbstractControllerBrokerRequestBatch.newBatch(ControllerChannelManager.scala:383)
 ~[kafka_2.12-2.4.1.10.jar:?]
at 
kafka.controller.ZkReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:109)
 ~[kafka_2.12-2.4.1.10.jar:?]
at 
kafka.controller.ReplicaStateMachine.startup(ReplicaStateMachine.scala:40) 
~[kafka_2.12-2.4.1.10.jar:?]
at 
kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:365)
 ~[kafka_2.12-2.4.1.10.jar:?]
at kafka.controller.KafkaController.elect(KafkaController.scala:1484) 
~[kafka_2.12-2.4.1.10.jar:?]
at 
kafka.controller.KafkaController.processReelect(KafkaController.scala:1972) 
~[kafka_2.12-2.4.1.10.jar:?]
at kafka.controller.KafkaController.process(KafkaController.scala:2065) 
~[kafka_2.12-2.4.1.10.jar:?]
at 
kafka.controller.QueuedEvent.process(ControllerEventManager.scala:53) 
~[kafka_2.12-2.4.1.10.jar:?]
at 
kafka.controller.ControllerEventManager$ControllerEventThread.$anonfun$doWork$1(ControllerEventManager.scala:137)
 ~[kafka_2.12-2.4.1.10.jar:?]
at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) 
[scala-library-2.12.10.jar:?]
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31) 
[kafka_2.12-2.4.1.10.jar:?]
at 
kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:137)
 [kafka_2.12-2.4.1.10.jar:?]
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) 
[kafka_2.12-2.4.1.10.jar:?]
```
Essentially, the controller is not able to transition some replicas to 
OnlineReplica state, and it cannot send any requests to any brokers via the 
ReplicaStateMachine.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10835) Replace Runnable and Callable overrides with lambdas in Connect

2021-02-08 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-10835.

Fix Version/s: 2.8.0
   Resolution: Fixed

> Replace Runnable and Callable overrides with lambdas in Connect
> ---
>
> Key: KAFKA-10835
> URL: https://issues.apache.org/jira/browse/KAFKA-10835
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Konstantine Karantasis
>Assignee: Lev Zemlyanov
>Priority: Minor
> Fix For: 2.8.0
>
>
> We've been using Java 8 for sometime now of course. Replacing the overrides 
> from the pre-Java 8 era will simplify some parts of the code and will reduce 
> verbosity. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10834) Remove redundant type casts in Connect

2021-02-08 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10834?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-10834.

Fix Version/s: 2.8.0
   Resolution: Fixed

> Remove redundant type casts in Connect
> --
>
> Key: KAFKA-10834
> URL: https://issues.apache.org/jira/browse/KAFKA-10834
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Konstantine Karantasis
>Assignee: Lev Zemlyanov
>Priority: Minor
> Fix For: 2.8.0
>
>
> Some type casts in the code base are not required any more and can be 
> removed. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12314) Leverage custom comparator for optimized range scans on RocksDB

2021-02-08 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17281526#comment-17281526
 ] 

Sagar Rao commented on KAFKA-12314:
---

hey [~ableegoldman], is this something I can take up?

> Leverage custom comparator for optimized range scans on RocksDB
> ---
>
> Key: KAFKA-12314
> URL: https://issues.apache.org/jira/browse/KAFKA-12314
> Project: Kafka
>  Issue Type: Improvement
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>
> Currently our SessionStore has poor performance on any range scans due to the 
> byte layout and possibility of varyingly sized keys. A session window 
> consists of the key and two timestamps, the windowEnd and windowStart. This 
> data is formatted as
> [key, windowEnd, windowStart]
> The default comparator in rocksdb is lexicographical, and so it compares 
> bytes starting with the key. This means with the above format, the records 
> are effectively sorted first by key and then by windowEnd. But if two keys 
> are of different lengths, the comparator will start on the left and end up 
> comparing the tail bytes of the longer key against the windowEnd timestamp of 
> the shorter key. Due to this, we have to set the bounds on SessionStore range 
> scans very conservatively, which means we end up reading way more data than 
> we need.
> One way out of this would be to use a custom comparator which understands the 
> window bytes format we use. So far we haven't done this because of the 
> overhead in crossing the JNI with the Java Comparator; we would need a native 
> comparator to avoid further performance hit.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10866) Add fetched metadata to ConsumerRecords

2021-02-08 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-10866.
--
Resolution: Fixed

> Add fetched metadata to ConsumerRecords
> ---
>
> Key: KAFKA-10866
> URL: https://issues.apache.org/jira/browse/KAFKA-10866
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 2.8.0
>
>
> Consumer-side changes for KIP-695



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10866) Add fetched metadata to ConsumerRecords

2021-02-08 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10866:
-
Priority: Major  (was: Blocker)

> Add fetched metadata to ConsumerRecords
> ---
>
> Key: KAFKA-10866
> URL: https://issues.apache.org/jira/browse/KAFKA-10866
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.8.0
>
>
> Consumer-side changes for KIP-695



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10866) Add fetched metadata to ConsumerRecords

2021-02-08 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10866:
-
Priority: Blocker  (was: Major)

> Add fetched metadata to ConsumerRecords
> ---
>
> Key: KAFKA-10866
> URL: https://issues.apache.org/jira/browse/KAFKA-10866
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 2.8.0
>
>
> Consumer-side changes for KIP-695



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Reopened] (KAFKA-10866) Add fetched metadata to ConsumerRecords

2021-02-08 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler reopened KAFKA-10866:
--

> Add fetched metadata to ConsumerRecords
> ---
>
> Key: KAFKA-10866
> URL: https://issues.apache.org/jira/browse/KAFKA-10866
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.8.0
>
>
> Consumer-side changes for KIP-695



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12314) Leverage custom comparator for optimized range scans on RocksDB

2021-02-08 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17281502#comment-17281502
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12314:


rocksdb has recently merged some improvements to the performance of their Java 
API. Once we upgrade our rocksdb dependency, we should investigate whether it 
would be possible to implement a more efficient comparator and then tighten up 
the range scan bounds

> Leverage custom comparator for optimized range scans on RocksDB
> ---
>
> Key: KAFKA-12314
> URL: https://issues.apache.org/jira/browse/KAFKA-12314
> Project: Kafka
>  Issue Type: Improvement
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>
> Currently our SessionStore has poor performance on any range scans due to the 
> byte layout and possibility of varyingly sized keys. A session window 
> consists of the key and two timestamps, the windowEnd and windowStart. This 
> data is formatted as
> [key, windowEnd, windowStart]
> The default comparator in rocksdb is lexicographical, and so it compares 
> bytes starting with the key. This means with the above format, the records 
> are effectively sorted first by key and then by windowEnd. But if two keys 
> are of different lengths, the comparator will start on the left and end up 
> comparing the tail bytes of the longer key against the windowEnd timestamp of 
> the shorter key. Due to this, we have to set the bounds on SessionStore range 
> scans very conservatively, which means we end up reading way more data than 
> we need.
> One way out of this would be to use a custom comparator which understands the 
> window bytes format we use. So far we haven't done this because of the 
> overhead in crossing the JNI with the Java Comparator; we would need a native 
> comparator to avoid further performance hit.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 merged pull request #10012: MINOR: Refactor return statement and log info

2021-02-08 Thread GitBox


chia7712 merged pull request #10012:
URL: https://github.com/apache/kafka/pull/10012


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12314) Leverage custom comparator for optimized range scans on RocksDB

2021-02-08 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-12314:
--

 Summary: Leverage custom comparator for optimized range scans on 
RocksDB
 Key: KAFKA-12314
 URL: https://issues.apache.org/jira/browse/KAFKA-12314
 Project: Kafka
  Issue Type: Improvement
Reporter: A. Sophie Blee-Goldman


Currently our SessionStore has poor performance on any range scans due to the 
byte layout and possibility of varyingly sized keys. A session window consists 
of the key and two timestamps, the windowEnd and windowStart. This data is 
formatted as

[key, windowEnd, windowStart]

The default comparator in rocksdb is lexicographical, and so it compares bytes 
starting with the key. This means with the above format, the records are 
effectively sorted first by key and then by windowEnd. But if two keys are of 
different lengths, the comparator will start on the left and end up comparing 
the tail bytes of the longer key against the windowEnd timestamp of the shorter 
key. Due to this, we have to set the bounds on SessionStore range scans very 
conservatively, which means we end up reading way more data than we need.

One way out of this would be to use a custom comparator which understands the 
window bytes format we use. So far we haven't done this because of the overhead 
in crossing the JNI with the Java Comparator; we would need a native comparator 
to avoid further performance hit.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] tang7526 commented on a change in pull request #10012: MINOR: Refactor return statement and log info

2021-02-08 Thread GitBox


tang7526 commented on a change in pull request #10012:
URL: https://github.com/apache/kafka/pull/10012#discussion_r572543598



##
File path: clients/src/main/java/org/apache/kafka/common/utils/Utils.java
##
@@ -1334,4 +1334,13 @@ public static long getDateTime(String timestamp) throws 
ParseException, IllegalA
 public static  Iterator covariantCast(Iterator 
iterator) {
 return (Iterator) iterator;
 }
+
+/**
+ * Checks if a string is null, empty or whitespace only.
+ * @param str a string to be checked
+ * @return true if the string is null, empty or whitespace only; 
otherwise, return false.
+ */
+public static boolean isBlank(String str) {

Review comment:
   > How about applying this helper method to code base?
   @chia7712  OK.  I will do it on next PR.
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #10083: MINOR: Remove check for comparision of 3rd IP of kafka.apache.org

2021-02-08 Thread GitBox


chia7712 commented on a change in pull request #10083:
URL: https://github.com/apache/kafka/pull/10083#discussion_r572542393



##
File path: 
clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
##
@@ -310,9 +310,6 @@ public void testMultipleIPsWithUseAll() throws 
UnknownHostException {
 connectionStates.connecting(nodeId1, time.milliseconds(), hostTwoIps, 
ClientDnsLookup.USE_ALL_DNS_IPS);
 InetAddress addr2 = connectionStates.currentAddress(nodeId1);

Review comment:
   Relying on "specify" number of IPs is still not stable for this tests 
(for example, the number of resolved IPs becomes 1). It seems to me this test 
should check whether the number used addresses (after building many 
connections) is equal to number of resolved addresses.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12313) Consider deprecating the default.windowed.serde.inner.class configs

2021-02-08 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-12313:
--

 Summary: Consider deprecating the 
default.windowed.serde.inner.class configs
 Key: KAFKA-12313
 URL: https://issues.apache.org/jira/browse/KAFKA-12313
 Project: Kafka
  Issue Type: Improvement
Reporter: A. Sophie Blee-Goldman


During the discussion of KIP-659 we discussed whether it made sense to have a 
"default" class for the serdes of windowed inner classes across Streams. Using 
these configs instead of specifying an actual Serde object can lead to subtle 
bugs, since the WindowedDeserializer requires a windowSize in addition to the 
inner class. If the default constructor is invoked, as it will be when falling 
back on the config, this windowSize defaults to MAX_VALUE. 

If the downstream program doesn't care about the window end time in the output, 
then this can go unnoticed and technically there is no problem. But if anything 
does depend on the end time, or the user just wants to manually read the output 
for testing purposes, then the MAX_VALUE will result in a garbage timestamp.

We should consider whether the convenience of specifying a config instead of 
instantiating a Serde in each operator is really worth the risk of a user 
accidentally failing to specify a windowSize



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on a change in pull request #10084: MINOR: Rename DecommissionBrokers to UnregisterBrokers

2021-02-08 Thread GitBox


chia7712 commented on a change in pull request #10084:
URL: https://github.com/apache/kafka/pull/10084#discussion_r572538990



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -217,6 +217,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 case ApiKeys.ENVELOPE => handleEnvelope(request)
 case ApiKeys.DESCRIBE_CLUSTER => handleDescribeCluster(request)
 case ApiKeys.DESCRIBE_PRODUCERS => 
handleDescribeProducersRequest(request)
+case ApiKeys.UNREGISTER_BROKER => maybeForwardToController(request, 
handleUnregisterBrokerRequest)

Review comment:
   The other APIs from KIP-500 is just closed. Maybe this API could follow 
same pattern?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-8575) Investigate removing EAGER protocol & cleaning up task suspension in Streams rebalancing

2021-02-08 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-8575:
--
Fix Version/s: (was: 3.0.0)
   3.1.0

> Investigate removing EAGER protocol &  cleaning up task suspension in Streams 
> rebalancing
> -
>
> Key: KAFKA-8575
> URL: https://issues.apache.org/jira/browse/KAFKA-8575
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
> Fix For: 3.1.0
>
>
> With KIP-429 the suspend/resume of tasks may have minimal gains while adding 
> a lot of complexity and potential bugs. We should consider removing/cleaning 
> it up.
> We should also consider removing EAGER rebalancing from Streams entirely, if 
> results indicate that COOPERATIVE displays superior performance. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on a change in pull request #10012: MINOR: Refactor return statement and log info

2021-02-08 Thread GitBox


chia7712 commented on a change in pull request #10012:
URL: https://github.com/apache/kafka/pull/10012#discussion_r572536164



##
File path: clients/src/main/java/org/apache/kafka/common/utils/Utils.java
##
@@ -1334,4 +1334,13 @@ public static long getDateTime(String timestamp) throws 
ParseException, IllegalA
 public static  Iterator covariantCast(Iterator 
iterator) {
 return (Iterator) iterator;
 }
+
+/**
+ * Checks if a string is null, empty or whitespace only.
+ * @param str a string to be checked
+ * @return true if the string is null, empty or whitespace only; 
otherwise, return false.
+ */
+public static boolean isBlank(String str) {

Review comment:
   How about applying this helper method to code base? 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12312) kafka_2.13:2.6.1 throws NoSuchMethodError when running against scala-sdk-2.13.4

2021-02-08 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17281487#comment-17281487
 ] 

Luke Chen commented on KAFKA-12312:
---

[~tkornai], we applied the scala 2.13.4 and fixed the compatible issues on the 
latest trunk branch, which should be in 2.8 release. I'll help you ask if we 
want to cherry-pick this issue into 2.6.2 release. Thanks for reporting.

> kafka_2.13:2.6.1 throws NoSuchMethodError when running against 
> scala-sdk-2.13.4
> ---
>
> Key: KAFKA-12312
> URL: https://issues.apache.org/jira/browse/KAFKA-12312
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.6.0, 2.6.1
>Reporter: Tamas Kornai
>Priority: Minor
>
> The below snippet runs without issues with {{scala-sdk-2.13.3}}, but throws 
> {{NoSuchMethodError}} for {{scala-sdk-2.13.4}}: 
> {quote}val authorize = new AclAuthorizer()
>  val acls = authorize.acls(AclBindingFilter.ANY)
> {quote}
> The error is: 
> {quote}Exception in thread "main" java.lang.NoSuchMethodError: 
> 'scala.collection.immutable.RedBlackTree$Tree 
> scala.collection.immutable.TreeMap.scala$collection$immutable$TreeMap$$tree()'
>  at kafka.security.authorizer.AclAuthorizer.acls(AclAuthorizer.scala:293)
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] gitlw commented on pull request #7670: KAFKA-7016: Not hide the stack trace for ApiException

2021-02-08 Thread GitBox


gitlw commented on pull request #7670:
URL: https://github.com/apache/kafka/pull/7670#issuecomment-775603648


   @guozhangwang I'm troubleshooting a broker side issue by following the 
exception
   ```
   2021/02/05 04:34:45.302 [ZkReplicaStateMachine] [ReplicaStateMachine 
controllerId=31862] Controller moved to another broker when moving some 
replic\as to OfflineReplica state
   org.apache.kafka.common.errors.ControllerMovedException: Controller epoch 
zkVersion check fails. Expected zkVersion = 139
   ```
   The troubleshooting is very painful without the stacktrace. When do you 
think we can get this PR merged?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 merged pull request #10079: MINOR: replace hard-coding utf-8 with StandardCharsets.UTF_8

2021-02-08 Thread GitBox


chia7712 merged pull request #10079:
URL: https://github.com/apache/kafka/pull/10079


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-02-08 Thread GitBox


junrao commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r572512463



##
File path: core/src/main/scala/kafka/server/FetchSession.scala
##
@@ -353,50 +508,68 @@ class SessionlessFetchContext(val fetchData: 
util.Map[TopicPartition, FetchReque
   * @param time   The clock to use.
   * @param cache  The fetch session cache.
   * @param reqMetadataThe request metadata.
-  * @param fetchData  The partition data from the fetch request.
+  * @param fetchDataAndError  The partition data and topic ID errors from the 
fetch request.
+  * @param topicIds   The map from topic name to topic IDs
   * @param isFromFollower True if this fetch request came from a follower.
   */
 class FullFetchContext(private val time: Time,
private val cache: FetchSessionCache,
private val reqMetadata: JFetchMetadata,
-   private val fetchData: util.Map[TopicPartition, 
FetchRequest.PartitionData],
+   private val fetchDataAndError: FetchDataAndError,
+   private val topicIds: util.Map[String, Uuid],
private val isFromFollower: Boolean) extends 
FetchContext {
+  val idErrors = new util.LinkedList(fetchDataAndError.idErrors.values())
   override def getFetchOffset(part: TopicPartition): Option[Long] =
-Option(fetchData.get(part)).map(_.fetchOffset)
+Option(fetchDataAndError.fetchData.get(part)).map(_.fetchOffset)
 
   override def foreachPartition(fun: (TopicPartition, 
FetchRequest.PartitionData) => Unit): Unit = {
-fetchData.forEach(fun(_, _))
+fetchDataAndError.fetchData.forEach(fun(_, _))

Review comment:
   I was referring to the following code. It seems to need to iterate every 
partition through fetchContext so that the UNKNOWN_TOPIC_OR_PARTITION error 
code can be added for each partition.
   
   ```
 if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, 
CLUSTER_NAME)) {
   fetchContext.foreachPartition { (topicPartition, data) =>
 if (!metadataCache.contains(topicPartition))
   erroneous += topicPartition -> 
errorResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION)
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #10041: MINOR: Prevent creating partition.metadata until ID can be written

2021-02-08 Thread GitBox


junrao commented on a change in pull request #10041:
URL: https://github.com/apache/kafka/pull/10041#discussion_r572510600



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -256,7 +256,8 @@ class Log(@volatile private var _dir: File,
   val topicPartition: TopicPartition,
   val producerStateManager: ProducerStateManager,
   logDirFailureChannel: LogDirFailureChannel,
-  private val hadCleanShutdown: Boolean = true) extends Logging with 
KafkaMetricsGroup {
+  private val hadCleanShutdown: Boolean = true,
+  val keepPartitionMetadataFile: Boolean = true) extends Logging with 
KafkaMetricsGroup {

Review comment:
   Yes, looks good to me.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #9986: JUnit extensions for integration tests

2021-02-08 Thread GitBox


cmccabe commented on pull request #9986:
URL: https://github.com/apache/kafka/pull/9986#issuecomment-775590227


   looks like there is a checkstyle issue:
   ```
   [2021-02-08T23:40:02.927Z] [Error] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-9986/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala:20:
 Unused import
   [2021-02-08T23:40:02.927Z] [Error] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-9986/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala:91:
 @nowarn annotation does not suppress any warnings
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #9986: JUnit extensions for integration tests

2021-02-08 Thread GitBox


cmccabe commented on pull request #9986:
URL: https://github.com/apache/kafka/pull/9986#issuecomment-775589690


   LGTM.  Thanks, @mumrah 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9986: JUnit extensions for integration tests

2021-02-08 Thread GitBox


cmccabe commented on a change in pull request #9986:
URL: https://github.com/apache/kafka/pull/9986#discussion_r572505265



##
File path: core/src/test/java/kafka/test/ClusterConfig.java
##
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test;
+
+import kafka.test.annotation.Type;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+
+import java.io.File;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Represents a requested configuration of a Kafka cluster for integration 
testing
+ */
+public class ClusterConfig {
+
+private final Type type;
+private final int brokers;
+private final int controllers;
+private final String name;
+private final boolean autoStart;
+
+private final String securityProtocol;
+private final String listenerName;
+private final File trustStoreFile;
+
+private final Properties serverProperties = new Properties();
+private final Properties producerProperties = new Properties();
+private final Properties consumerProperties = new Properties();
+private final Properties adminClientProperties = new Properties();
+private final Properties saslServerProperties = new Properties();
+private final Properties saslClientProperties = new Properties();
+
+ClusterConfig(Type type, int brokers, int controllers, String name, 
boolean autoStart,
+  String securityProtocol, String listenerName, File 
trustStoreFile) {
+this.type = type;
+this.brokers = brokers;
+this.controllers = controllers;
+this.name = name;
+this.autoStart = autoStart;
+this.securityProtocol = securityProtocol;
+this.listenerName = listenerName;
+this.trustStoreFile = trustStoreFile;
+}
+
+public Type clusterType() {
+return type;
+}
+
+public int brokers() {
+return brokers;
+}
+
+public int controllers() {
+return controllers;

Review comment:
   ok





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10041: MINOR: Prevent creating partition.metadata until ID can be written

2021-02-08 Thread GitBox


jolshan commented on a change in pull request #10041:
URL: https://github.com/apache/kafka/pull/10041#discussion_r572482465



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -256,7 +256,8 @@ class Log(@volatile private var _dir: File,
   val topicPartition: TopicPartition,
   val producerStateManager: ProducerStateManager,
   logDirFailureChannel: LogDirFailureChannel,
-  private val hadCleanShutdown: Boolean = true) extends Logging with 
KafkaMetricsGroup {
+  private val hadCleanShutdown: Boolean = true,
+  val keepPartitionMetadataFile: Boolean = true) extends Logging with 
KafkaMetricsGroup {

Review comment:
   Something like this work for an explanation?
   ```
   * boolean flag to indicate whether the partition.metadata file should be 
kept in the 
   * log directory. A partition.metadata file is only created when the 
controller's 
   * inter-broker protocol version is at least 2.8. This file will persist the 
topic ID on
   * the broker. If inter-broker protocol is downgraded below 2.8, a topic ID 
may be lost
   * and a new ID generated upon re-upgrade. If the inter-broker protocol 
version is below
   * 2.8, partition.metadata will be deleted to avoid ID conflicts upon 
re-upgrade. 
```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10041: MINOR: Prevent creating partition.metadata until ID can be written

2021-02-08 Thread GitBox


jolshan commented on a change in pull request #10041:
URL: https://github.com/apache/kafka/pull/10041#discussion_r572482465



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -256,7 +256,8 @@ class Log(@volatile private var _dir: File,
   val topicPartition: TopicPartition,
   val producerStateManager: ProducerStateManager,
   logDirFailureChannel: LogDirFailureChannel,
-  private val hadCleanShutdown: Boolean = true) extends Logging with 
KafkaMetricsGroup {
+  private val hadCleanShutdown: Boolean = true,
+  val keepPartitionMetadataFile: Boolean = true) extends Logging with 
KafkaMetricsGroup {

Review comment:
   Something like this work for an explanation?
   ```
   * boolean flag to indicate whether the partition metadata file should be 
kept in the 
   * log directory. A partition.metadata file is only created when the 
controller's 
   * inter-broker protocol version is at least 2.8. This file will persist the 
topic ID on
   * the broker. If inter-broker protocol is downgraded below 2.8, a topic ID 
may be lost
   * and a new ID generated upon re-upgrade. If the inter-broker protocol 
version is below
   * 2.8, partition.metadata will be deleted to avoid ID conflicts upon 
re-upgrade. 
```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10041: MINOR: Prevent creating partition.metadata until ID can be written

2021-02-08 Thread GitBox


jolshan commented on a change in pull request #10041:
URL: https://github.com/apache/kafka/pull/10041#discussion_r572475420



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -256,7 +256,8 @@ class Log(@volatile private var _dir: File,
   val topicPartition: TopicPartition,
   val producerStateManager: ProducerStateManager,
   logDirFailureChannel: LogDirFailureChannel,
-  private val hadCleanShutdown: Boolean = true) extends Logging with 
KafkaMetricsGroup {
+  private val hadCleanShutdown: Boolean = true,
+  val keepPartitionMetadataFile: Boolean = true) extends Logging with 
KafkaMetricsGroup {

Review comment:
   good point. will do!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #10041: MINOR: Prevent creating partition.metadata until ID can be written

2021-02-08 Thread GitBox


junrao commented on a change in pull request #10041:
URL: https://github.com/apache/kafka/pull/10041#discussion_r572474952



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -256,7 +256,8 @@ class Log(@volatile private var _dir: File,
   val topicPartition: TopicPartition,
   val producerStateManager: ProducerStateManager,
   logDirFailureChannel: LogDirFailureChannel,
-  private val hadCleanShutdown: Boolean = true) extends Logging with 
KafkaMetricsGroup {
+  private val hadCleanShutdown: Boolean = true,
+  val keepPartitionMetadataFile: Boolean = true) extends Logging with 
KafkaMetricsGroup {

Review comment:
   Could we add the new param to the javadoc? In the javadoc, it would be 
useful to explain a bit how this helps with re-upgrade.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-02-08 Thread GitBox


jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r572475014



##
File path: core/src/main/scala/kafka/server/FetchSession.scala
##
@@ -353,50 +508,68 @@ class SessionlessFetchContext(val fetchData: 
util.Map[TopicPartition, FetchReque
   * @param time   The clock to use.
   * @param cache  The fetch session cache.
   * @param reqMetadataThe request metadata.
-  * @param fetchData  The partition data from the fetch request.
+  * @param fetchDataAndError  The partition data and topic ID errors from the 
fetch request.
+  * @param topicIds   The map from topic name to topic IDs
   * @param isFromFollower True if this fetch request came from a follower.
   */
 class FullFetchContext(private val time: Time,
private val cache: FetchSessionCache,
private val reqMetadata: JFetchMetadata,
-   private val fetchData: util.Map[TopicPartition, 
FetchRequest.PartitionData],
+   private val fetchDataAndError: FetchDataAndError,
+   private val topicIds: util.Map[String, Uuid],
private val isFromFollower: Boolean) extends 
FetchContext {
+  val idErrors = new util.LinkedList(fetchDataAndError.idErrors.values())
   override def getFetchOffset(part: TopicPartition): Option[Long] =
-Option(fetchData.get(part)).map(_.fetchOffset)
+Option(fetchDataAndError.fetchData.get(part)).map(_.fetchOffset)
 
   override def foreachPartition(fun: (TopicPartition, 
FetchRequest.PartitionData) => Unit): Unit = {
-fetchData.forEach(fun(_, _))
+fetchDataAndError.fetchData.forEach(fun(_, _))
   }
 
   override def getResponseSize(updates: FetchSession.RESP_MAP, versionId: 
Short): Int = {
-FetchResponse.sizeOf(versionId, updates.entrySet.iterator)
+FetchResponse.sizeOf(versionId, updates.entrySet.iterator, idErrors, 
topicIds)
   }
 
   override def updateAndGenerateResponseData(updates: FetchSession.RESP_MAP): 
FetchResponse[Records] = {
 def createNewSession: FetchSession.CACHE_MAP = {
   val cachedPartitions = new FetchSession.CACHE_MAP(updates.size)
   updates.forEach { (part, respData) =>
-val reqData = fetchData.get(part)
-cachedPartitions.mustAdd(new CachedPartition(part, reqData, respData))
+val reqData = fetchDataAndError.fetchData.get(part)
+cachedPartitions.mustAdd(new CachedPartition(part, 
topicIds.getOrDefault(part.topic(), Uuid.ZERO_UUID), reqData, respData))

Review comment:
   I think this is used for handling the case of older request versions.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-02-08 Thread GitBox


jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r572472724



##
File path: core/src/main/scala/kafka/server/FetchSession.scala
##
@@ -237,14 +317,80 @@ class FetchSession(val id: Int,
   type TL = util.ArrayList[TopicPartition]
 
   // Update the cached partition data based on the request.
-  def update(fetchData: FetchSession.REQ_MAP,
- toForget: util.List[TopicPartition],
- reqMetadata: JFetchMetadata): (TL, TL, TL) = synchronized {
+  def update(version: Short,
+ fetchDataAndError: FetchDataAndError,
+ toForgetAndIds: ToForgetAndIds,
+ reqMetadata: JFetchMetadata,
+ topicIds: util.Map[String, Uuid],
+ topicNames: util.Map[Uuid, String]): (TL, TL, TL) = synchronized {
 val added = new TL
 val updated = new TL
 val removed = new TL
-fetchData.forEach { (topicPart, reqData) =>
-  val newCachedPart = new CachedPartition(topicPart, reqData)
+
+// Only make changes to topic IDs if we have a new request version.
+// If we receive an old request version, ignore all topic ID code, keep 
IDs that are there.
+if (version >= 13) {

Review comment:
   I think we do since this is looking at the partitions cached in the 
session. I'll take another look though.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-02-08 Thread GitBox


jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r572470829



##
File path: core/src/main/scala/kafka/server/FetchSession.scala
##
@@ -353,50 +508,68 @@ class SessionlessFetchContext(val fetchData: 
util.Map[TopicPartition, FetchReque
   * @param time   The clock to use.
   * @param cache  The fetch session cache.
   * @param reqMetadataThe request metadata.
-  * @param fetchData  The partition data from the fetch request.
+  * @param fetchDataAndError  The partition data and topic ID errors from the 
fetch request.
+  * @param topicIds   The map from topic name to topic IDs
   * @param isFromFollower True if this fetch request came from a follower.
   */
 class FullFetchContext(private val time: Time,
private val cache: FetchSessionCache,
private val reqMetadata: JFetchMetadata,
-   private val fetchData: util.Map[TopicPartition, 
FetchRequest.PartitionData],
+   private val fetchDataAndError: FetchDataAndError,
+   private val topicIds: util.Map[String, Uuid],
private val isFromFollower: Boolean) extends 
FetchContext {
+  val idErrors = new util.LinkedList(fetchDataAndError.idErrors.values())
   override def getFetchOffset(part: TopicPartition): Option[Long] =
-Option(fetchData.get(part)).map(_.fetchOffset)
+Option(fetchDataAndError.fetchData.get(part)).map(_.fetchOffset)
 
   override def foreachPartition(fun: (TopicPartition, 
FetchRequest.PartitionData) => Unit): Unit = {
-fetchData.forEach(fun(_, _))
+fetchDataAndError.fetchData.forEach(fun(_, _))

Review comment:
   When you mention that some callers need to iterate over all partitions 
like CLUSTER ACTION -- I'm a little confused. I thought the request context was 
passed into AuthHelper for that.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12213) Kafka Streams aggregation Initializer to accept record key

2021-02-08 Thread Piotr Fras (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17281463#comment-17281463
 ] 

Piotr Fras commented on KAFKA-12213:


Hi [~bbejeck], how are things with KIP-149 progressing? Is there anything I can 
help/assist with?

> Kafka Streams aggregation Initializer to accept record key
> --
>
> Key: KAFKA-12213
> URL: https://issues.apache.org/jira/browse/KAFKA-12213
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Piotr Fras
>Assignee: Piotr Fras
>Priority: Minor
>  Labels: needs-kip
>
> Sometimes Kafka record key contains useful information for creating a zero 
> object in aggregation Initializer. This feature is to add kafka record key to 
> Initializer.
> There were two approaches I considered to implement this feature, one 
> respecting backwards compatibility for internal and external APIs and the 
> other one which is not. I chose the latter one as it was more strait-forward. 
> We may want to validate this approach tho.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jsancio opened a new pull request #10085: KAFKA-12154: Snapshot Loading API

2021-02-08 Thread GitBox


jsancio opened a new pull request #10085:
URL: https://github.com/apache/kafka/pull/10085


   WIP
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #10041: MINOR: Prevent creating partition.metadata until ID can be written

2021-02-08 Thread GitBox


junrao commented on a change in pull request #10041:
URL: https://github.com/apache/kafka/pull/10041#discussion_r572450889



##
File path: core/src/test/scala/unit/kafka/log/LogManagerTest.scala
##
@@ -217,6 +217,7 @@ class LogManagerTest {
 }
 assertTrue(log.numberOfSegments > 1, "There should be more than one 
segment now.")
 log.updateHighWatermark(log.logEndOffset)
+log.partitionMetadataFile.get.write(Uuid.randomUuid())

Review comment:
   That's what I am thinking. It's kind of weird to add the new file just 
to get the existing math work.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #10041: MINOR: Prevent creating partition.metadata until ID can be written

2021-02-08 Thread GitBox


junrao commented on a change in pull request #10041:
URL: https://github.com/apache/kafka/pull/10041#discussion_r572450889



##
File path: core/src/test/scala/unit/kafka/log/LogManagerTest.scala
##
@@ -217,6 +217,7 @@ class LogManagerTest {
 }
 assertTrue(log.numberOfSegments > 1, "There should be more than one 
segment now.")
 log.updateHighWatermark(log.logEndOffset)
+log.partitionMetadataFile.get.write(Uuid.randomUuid())

Review comment:
   That's what I am thinking. It's kind of weird to add the new file just 
to get the existing math works.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10041: MINOR: Prevent creating partition.metadata until ID can be written

2021-02-08 Thread GitBox


jolshan commented on a change in pull request #10041:
URL: https://github.com/apache/kafka/pull/10041#discussion_r572445245



##
File path: core/src/test/scala/unit/kafka/log/LogManagerTest.scala
##
@@ -217,6 +217,7 @@ class LogManagerTest {
 }
 assertTrue(log.numberOfSegments > 1, "There should be more than one 
segment now.")
 log.updateHighWatermark(log.logEndOffset)
+log.partitionMetadataFile.get.write(Uuid.randomUuid())

Review comment:
   So remove the creation and subtract one file?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #10041: MINOR: Prevent creating partition.metadata until ID can be written

2021-02-08 Thread GitBox


junrao commented on a change in pull request #10041:
URL: https://github.com/apache/kafka/pull/10041#discussion_r57205



##
File path: core/src/test/scala/unit/kafka/log/LogManagerTest.scala
##
@@ -217,6 +217,7 @@ class LogManagerTest {
 }
 assertTrue(log.numberOfSegments > 1, "There should be more than one 
segment now.")
 log.updateHighWatermark(log.logEndOffset)
+log.partitionMetadataFile.get.write(Uuid.randomUuid())

Review comment:
   Got it. Since the partitionMetadataFile is now created on demand, 
perhaps we could just change the math on the expected number of files?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10066: KAFKA-12278; Ensure exposed api versions are consistent within listener scopes

2021-02-08 Thread GitBox


cmccabe commented on a change in pull request #10066:
URL: https://github.com/apache/kafka/pull/10066#discussion_r572443431



##
File path: generator/src/main/java/org/apache/kafka/message/RequestApiScope.java
##
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.message;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public enum RequestApiScope {
+@JsonProperty("zkBroker")
+ZK_BROKER,
+
+@JsonProperty("broker")
+BROKER,
+
+@JsonProperty("controller")
+CONTROLLER,
+
+@JsonProperty("raft")
+RAFT;

Review comment:
   @ijuma : unfortunately we don't have a clear separation between clients 
and brokers at the protocol level.  As you know, if a random node connects to 
the broker and asks for ApiVersions, the broker doesn't (yet) know if the other 
node is another broker or if it's a client.  So it wouldn't help to label APIs 
as CLIENT vs. BROKER, since the broker has to send back both anyway.
   
   On the other hand, we can definitely put APIs into "zk broker", "kip-500 
zk", and "kip500 controller" and use those buckets to figure out what to send 
in the ApiVersionsRequest.
   
   @hachikuji : The "raft" scope seems a little weird since all the other 
scopes map to listeners on servers.  The TestRaftServer is just an internal 
junit thing, right?  So it's probably fine if it just uses the Controller scope 
and returns `UnsupportedVersionException` for things it doesn't support.  I 
don't think we should have a separate scope for this unless it somehow impacts 
the real servers.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #10069: MINOR: Add RaftReplicaManager

2021-02-08 Thread GitBox


rondagostino commented on a change in pull request #10069:
URL: https://github.com/apache/kafka/pull/10069#discussion_r572442503



##
File path: core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
##
@@ -39,11 +41,30 @@ object MetadataPartition {
   record.isr(),
   Collections.emptyList(), // TODO KAFKA-12285 handle offline replicas
   Collections.emptyList(),
-  Collections.emptyList())
+  Collections.emptyList(),
+  largestDeferredOffsetEverSeen = 
deferredAtOffset.getOrElse(OffsetNeverDeferred),
+  isCurrentlyDeferringChanges = deferredAtOffset.isDefined)

Review comment:
   We basically use `largestDeferredOffsetEverSeen` only for logging at 
this point -- we also check it in a few `private def sanityCheckState...()` 
`RaftReplicaManager` methods.  We could completely eliminate 
`largestDeferredOffsetEverSeen` if we didn't want to log when the partition was 
last deferred.  It just tracks when the partition was last seen and the change 
at that offset was deferred rather than directly applied.  Once the partition 
is no longer deferred the value remains whatever it was and the boolean flips 
to `false`.
   
   It does seem on the surface that we could change the declaration to 
`deferredSinceOffset` and get rid of the boolean -- and `deferredSinceOffset` 
would change to `-1` once those changes are applied.  But there is a problem 
with this if the partition changes to not being deferred in the metadata cache 
before we ask `RaftReplicaManager` to process all of its deferred changes: the 
value will be -1 in the metadata cache under those circumstances, and we 
wouldn't have the value to log.
   
   So I think we have a few options.
   
   1. Do the logging, apply the changes to the matadata cache before replica 
manager, and keep the `Long` and `Boolean` as currently defined
   2. Do the logging, apply the changes to the matadata cache **after** replica 
manager, and use just a `Long` (with the semantics being changed as described 
above)
   3. Just use a Boolean and don't do the logging.
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #10069: MINOR: Add RaftReplicaManager

2021-02-08 Thread GitBox


cmccabe commented on pull request #10069:
URL: https://github.com/apache/kafka/pull/10069#issuecomment-775521260


   Thanks for this PR, @rondagostino !
   
   I can see why you wanted to have `RaftReplicaChangeDelegateHelper`.  The 
`ReplicaManager` is not very easy to unit test because it has grown so large.  
I don't think this delegate thing is quite the right abstraction here-- it's 
pretty confusing-- but I guess let's revisit this after 2.8 is finished.
   
   I suppose one option is, once `ReplicaManager` is a pure interface, we can 
split the kip-500 update logic off into a separate set of functions that takes 
a `ReplicaManager` as an input.  Then we can easily unit-test the update logic 
with a `MockReplicaManager`.
   
   For now I left two small comments... LGTM after those are addressed.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10069: MINOR: Add RaftReplicaManager

2021-02-08 Thread GitBox


cmccabe commented on a change in pull request #10069:
URL: https://github.com/apache/kafka/pull/10069#discussion_r572433038



##
File path: core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
##
@@ -30,7 +31,8 @@ import scala.jdk.CollectionConverters._
 
 
 object MetadataPartition {
-  def apply(name: String, record: PartitionRecord): MetadataPartition = {
+  val OffsetNeverDeferred = 0L // must not be a valid offset we could see 
(i.e. must not be positive)

Review comment:
   Can you add JavaDoc for this?
   
   Also, what about `NoDeferredOffset` as a name?
   
   One last question... why is this 0 and not -1?  0 is a valid offset in the 
log, whereas -1 is not.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10069: MINOR: Add RaftReplicaManager

2021-02-08 Thread GitBox


cmccabe commented on a change in pull request #10069:
URL: https://github.com/apache/kafka/pull/10069#discussion_r572433443



##
File path: core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
##
@@ -39,11 +41,30 @@ object MetadataPartition {
   record.isr(),
   Collections.emptyList(), // TODO KAFKA-12285 handle offline replicas
   Collections.emptyList(),
-  Collections.emptyList())
+  Collections.emptyList(),
+  largestDeferredOffsetEverSeen = 
deferredAtOffset.getOrElse(OffsetNeverDeferred),
+  isCurrentlyDeferringChanges = deferredAtOffset.isDefined)

Review comment:
   Hmm... why do we need this boolean?  Can't we just check if 
`largestDeferredOffsetEverSeen` is not `OffsetNeverDeferred`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10069: MINOR: Add RaftReplicaManager

2021-02-08 Thread GitBox


cmccabe commented on a change in pull request #10069:
URL: https://github.com/apache/kafka/pull/10069#discussion_r572433038



##
File path: core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
##
@@ -30,7 +31,8 @@ import scala.jdk.CollectionConverters._
 
 
 object MetadataPartition {
-  def apply(name: String, record: PartitionRecord): MetadataPartition = {
+  val OffsetNeverDeferred = 0L // must not be a valid offset we could see 
(i.e. must not be positive)

Review comment:
   Can you add JavaDoc for this?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10041: MINOR: Prevent creating partition.metadata until ID can be written

2021-02-08 Thread GitBox


jolshan commented on a change in pull request #10041:
URL: https://github.com/apache/kafka/pull/10041#discussion_r572421631



##
File path: core/src/test/scala/unit/kafka/log/LogManagerTest.scala
##
@@ -217,6 +217,7 @@ class LogManagerTest {
 }
 assertTrue(log.numberOfSegments > 1, "There should be more than one 
segment now.")
 log.updateHighWatermark(log.logEndOffset)
+log.partitionMetadataFile.get.write(Uuid.randomUuid())

Review comment:
   I wanted to test the correct files are kept/deleted. I could also change 
the math in line 234 if we don't need this test.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on pull request #9780: KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics

2021-02-08 Thread GitBox


rhauch commented on pull request #9780:
URL: https://github.com/apache/kafka/pull/9780#issuecomment-775505877


   Added another commit that uses Mockito in the new `SharedTopicAdminTest` 
class to do what @kkonstantine had suggested and which was not possible with 
EasyMock. 
   
   Mockito FTW!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on a change in pull request #9780: KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics

2021-02-08 Thread GitBox


rhauch commented on a change in pull request #9780:
URL: https://github.com/apache/kafka/pull/9780#discussion_r572421771



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/util/SharedTopicAdminTest.java
##
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.kafka.connect.errors.ConnectException;
+import org.easymock.EasyMock;
+import org.easymock.Mock;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThrows;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(KafkaBasedLog.class)
+@PowerMockIgnore("javax.management.*")
+public class SharedTopicAdminTest {
+
+private static final Map CONFIG = Collections.emptyMap();
+
+@Mock private TopicAdmin mockTopicAdmin;
+private SharedTopicAdmin sharedAdmin;
+private int created = 0;

Review comment:
   Okay, Mockito FTW! I've rewritten the `SharedTopicAdminTest` class to 
use Mockito instead of PowerMock and EasyMock, and was able to use mocks to 
assert the correct number of times an admin instance was created and closed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10041: MINOR: Prevent creating partition.metadata until ID can be written

2021-02-08 Thread GitBox


jolshan commented on a change in pull request #10041:
URL: https://github.com/apache/kafka/pull/10041#discussion_r572421631



##
File path: core/src/test/scala/unit/kafka/log/LogManagerTest.scala
##
@@ -217,6 +217,7 @@ class LogManagerTest {
 }
 assertTrue(log.numberOfSegments > 1, "There should be more than one 
segment now.")
 log.updateHighWatermark(log.logEndOffset)
+log.partitionMetadataFile.get.write(Uuid.randomUuid())

Review comment:
   I wanted to test that the file is actually getting deleted. In order to 
check it gets deleted it has to be created. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10041: MINOR: Prevent creating partition.metadata until ID can be written

2021-02-08 Thread GitBox


jolshan commented on a change in pull request #10041:
URL: https://github.com/apache/kafka/pull/10041#discussion_r572420716



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -341,10 +342,15 @@ class Log(@volatile private var _dir: File,
 
producerStateManager.removeStraySnapshots(segments.values().asScala.map(_.baseOffset).toSeq)
 loadProducerState(logEndOffset, reloadFromCleanShutdown = hadCleanShutdown)
 
-// Recover topic ID if present
+// Delete partition metadata file if the version does not support topic 
IDs.
+// Recover topic ID if present and topic IDs are supported
 partitionMetadataFile.foreach { file =>
-  if (!file.isEmpty())
-topicId = file.read().topicId
+  if (file.exists()) {
+if (!usesTopicId)
+  file.delete()

Review comment:
   I modeled this off of `leaderEpochCache` which is also an option. But I 
see how that is different now.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10041: MINOR: Prevent creating partition.metadata until ID can be written

2021-02-08 Thread GitBox


jolshan commented on a change in pull request #10041:
URL: https://github.com/apache/kafka/pull/10041#discussion_r572419692



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -256,7 +256,8 @@ class Log(@volatile private var _dir: File,
   val topicPartition: TopicPartition,
   val producerStateManager: ProducerStateManager,
   logDirFailureChannel: LogDirFailureChannel,
-  private val hadCleanShutdown: Boolean = true) extends Logging with 
KafkaMetricsGroup {
+  private val hadCleanShutdown: Boolean = true,
+  val usesTopicId: Boolean = true) extends Logging with 
KafkaMetricsGroup {

Review comment:
   I named this based on the config's name. But the name you suggested is 
more descriptive.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #10041: MINOR: Prevent creating partition.metadata until ID can be written

2021-02-08 Thread GitBox


junrao commented on a change in pull request #10041:
URL: https://github.com/apache/kafka/pull/10041#discussion_r572399914



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -341,10 +342,15 @@ class Log(@volatile private var _dir: File,
 
producerStateManager.removeStraySnapshots(segments.values().asScala.map(_.baseOffset).toSeq)
 loadProducerState(logEndOffset, reloadFromCleanShutdown = hadCleanShutdown)
 
-// Recover topic ID if present
+// Delete partition metadata file if the version does not support topic 
IDs.
+// Recover topic ID if present and topic IDs are supported
 partitionMetadataFile.foreach { file =>
-  if (!file.isEmpty())
-topicId = file.read().topicId
+  if (file.exists()) {
+if (!usesTopicId)
+  file.delete()

Review comment:
   Does partitionMetadataFile need to be of Some? It seems that we can just 
always instantiate the object.

##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -256,7 +256,8 @@ class Log(@volatile private var _dir: File,
   val topicPartition: TopicPartition,
   val producerStateManager: ProducerStateManager,
   logDirFailureChannel: LogDirFailureChannel,
-  private val hadCleanShutdown: Boolean = true) extends Logging with 
KafkaMetricsGroup {
+  private val hadCleanShutdown: Boolean = true,
+  val usesTopicId: Boolean = true) extends Logging with 
KafkaMetricsGroup {

Review comment:
   Could we add the new param to javadoc? Also, will 
keepPartitionMetdataFile be better than usesTopicId?

##
File path: core/src/main/scala/kafka/server/PartitionMetadataFile.scala
##
@@ -91,11 +91,10 @@ class PartitionMetadataFile(val file: File,
   private val lock = new Object()
   private val logDir = file.getParentFile.getParent
 
-
-  try Files.createFile(file.toPath) // create the file if it doesn't exist
-  catch { case _: FileAlreadyExistsException => }
-
   def write(topicId: Uuid): Unit = {
+try Files.createFile(file.toPath) // create the file if it doesn't exist

Review comment:
   Do we need to create the file first? It seems that later on we always 
rename the temp file to this one.
   

##
File path: core/src/test/scala/unit/kafka/log/LogManagerTest.scala
##
@@ -217,6 +217,7 @@ class LogManagerTest {
 }
 assertTrue(log.numberOfSegments > 1, "There should be more than one 
segment now.")
 log.updateHighWatermark(log.logEndOffset)
+log.partitionMetadataFile.get.write(Uuid.randomUuid())

Review comment:
   Is this needed? It seems Log never reads UUID? Ditto below.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-02-08 Thread GitBox


jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r572413017



##
File path: core/src/main/scala/kafka/server/FetchSession.scala
##
@@ -71,6 +75,7 @@ object FetchSession {
   * localLogStartOffset is the log start offset of the partition on this 
broker.
   */
 class CachedPartition(val topic: String,
+  var topicId: Uuid,

Review comment:
   I thought about this, but I was worried about some weirdness where we 
need to support partitions before and after they have an id. (The partitions 
are techincally equivalent, but equals wouldn't reflect that) This may also 
cause problems in cases where IDs may change. Consider the code ` val 
cachedPart = session.partitionMap.find(new CachedPartition(topicPart))`  This 
is used in the path of deleting partitions with stale IDs. We would need to 
know the topic ID to find the partition here.  I could see potential issues 
where we no longer have the ID and would have trouble removing it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-02-08 Thread GitBox


jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r572413017



##
File path: core/src/main/scala/kafka/server/FetchSession.scala
##
@@ -71,6 +75,7 @@ object FetchSession {
   * localLogStartOffset is the log start offset of the partition on this 
broker.
   */
 class CachedPartition(val topic: String,
+  var topicId: Uuid,

Review comment:
   I thought about this, but I was worried about some weirdness where we 
need to support partitions before and after they have an id. This may also 
cause problems in cases where IDs may change. Consider the code ` val 
cachedPart = session.partitionMap.find(new CachedPartition(topicPart))`  This 
is used in the path of deleting partitions with stale IDs. We would need to 
know the topic ID to find the partition here.  I could see potential issues 
where we no longer have the ID and would have trouble removing it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe opened a new pull request #10084: MINOR: Rename DecommissionBrokers to UnregisterBrokers

2021-02-08 Thread GitBox


cmccabe opened a new pull request #10084:
URL: https://github.com/apache/kafka/pull/10084


   Rename DecommissionBrokers to UnregisterBrokers.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9986: JUnit extensions for integration tests

2021-02-08 Thread GitBox


mumrah commented on a change in pull request #9986:
URL: https://github.com/apache/kafka/pull/9986#discussion_r572393736



##
File path: core/src/test/java/kafka/test/ClusterConfig.java
##
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test;
+
+import kafka.test.annotation.Type;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+
+import java.io.File;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Represents a requested configuration of a Kafka cluster for integration 
testing
+ */
+public class ClusterConfig {
+
+private final Type type;
+private final int brokers;
+private final int controllers;
+private final String name;
+private final boolean autoStart;
+
+private final String securityProtocol;
+private final String listenerName;
+private final File trustStoreFile;
+
+private final Properties serverProperties = new Properties();
+private final Properties producerProperties = new Properties();
+private final Properties consumerProperties = new Properties();
+private final Properties adminClientProperties = new Properties();
+private final Properties saslServerProperties = new Properties();
+private final Properties saslClientProperties = new Properties();
+
+ClusterConfig(Type type, int brokers, int controllers, String name, 
boolean autoStart,
+  String securityProtocol, String listenerName, File 
trustStoreFile) {
+this.type = type;
+this.brokers = brokers;
+this.controllers = controllers;
+this.name = name;
+this.autoStart = autoStart;
+this.securityProtocol = securityProtocol;
+this.listenerName = listenerName;
+this.trustStoreFile = trustStoreFile;
+}
+
+public Type clusterType() {
+return type;
+}
+
+public int brokers() {
+return brokers;
+}
+
+public int controllers() {
+return controllers;

Review comment:
   In ZK mode, number of controllers is basically ignored. I can add some 
logic to validate that someone hasn't mistakenly requested more than one 
controller for a ZK cluster.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9986: JUnit extensions for integration tests

2021-02-08 Thread GitBox


cmccabe commented on a change in pull request #9986:
URL: https://github.com/apache/kafka/pull/9986#discussion_r572392149



##
File path: core/src/test/java/kafka/test/annotation/AutoStart.java
##
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test.annotation;
+
+public enum AutoStart {

Review comment:
   Thanks for the explanation.  That makes sense.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9986: JUnit extensions for integration tests

2021-02-08 Thread GitBox


mumrah commented on a change in pull request #9986:
URL: https://github.com/apache/kafka/pull/9986#discussion_r572387041



##
File path: core/src/test/java/kafka/test/annotation/AutoStart.java
##
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test.annotation;
+
+public enum AutoStart {

Review comment:
   Since we can't use `null` as defaults for annotations, I added DEFAULT 
enum constants for cluster type and auto start attributes. When processing the 
`@ClusterTest` method annotations, if we see Type.DEFAULT or AutoStart.DEFAULT, 
we know the test method is saying "use the default". We could simply put the 
defaults right in the `ClusterTest` annotation, but then we would have an issue 
when method-level `@ClusterTest` and class-level `@ClusterTestDefaults` are 
both in use. How would we know which to use? 
   
   I added AutoStart.DEFAULT in order to let the method-level annotation 
explicitly defer to the default. Here's an example:
   
   ```java
   // indicate that for this test, we want to disable auto-start by default
   @ClusterTestDefaults(autoStart = false)
   public class TestClass {
 
 // but this test does want auto start
 @ClusterTest(autoStart = AutoStart.YES)
 public void test1() { }
   
 // this test is unnecessarily setting auto start as false
 @ClusterTest(autoStart = AutoStart.NO)
 public void test2() { }
   
 // this test will get its auto start value from the class level
 @ClusterTest
 public void test3() { }
   }
   ```
   
   An alternative would be to allow for a class-level annotation that 
completely overrides any method-level `@ClusterTest`, but that seems a little 
backwards.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9986: JUnit extensions for integration tests

2021-02-08 Thread GitBox


mumrah commented on a change in pull request #9986:
URL: https://github.com/apache/kafka/pull/9986#discussion_r572387041



##
File path: core/src/test/java/kafka/test/annotation/AutoStart.java
##
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test.annotation;
+
+public enum AutoStart {

Review comment:
   Since we can't use `null` as defaults for annotations, I added DEFAULT 
enum constants for cluster type and auto start attributes. When processing the 
`@ClusterTest` method annotations, if we see Type.DEFAULT or AutoStart.DEFAULT, 
we know the test method is saying "use the default". We could simply put the 
defaults right in the `ClusterTest` annotation, but then we would have an issue 
when method-level `@ClusterTest` and class-level `@ClusterTestDefaults` are 
both in use. How would we know which to use? 
   
   I added AutoStart.DEFAULT in order to let the method-level annotation 
explicitly defer to the default. Here's an example:
   
   ```java
   // indicate that for this test, we want to disable auto-start by default
   @ClusterTestDefaults(autoStart = false)
   public class TestClass {
 
 // but this test does want auto start
 @ClusterTest(autoStart = AutoStart.YES)
 public void test1() { }
   
 // this test is unnecessarily setting auto start as false
 @ClusterTest(autoStart = AutoStart.NO)
 public void test2() { }
   
 // this test will get its autoStart value from the class level
 @ClusterTest
 public void test3() { }
   }
   ```
   
   An alternative would be to allow for a class-level annotation that 
completely overrides any method-level `@ClusterTest`, but that seems a little 
backwards.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-02-08 Thread GitBox


jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r572381787



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##
@@ -143,34 +275,71 @@ public String toString() {
 return result;
 }
 
+private List 
toForgottenTopicList(List forgottenTopics, 
Map topicNames) {
+List result = new ArrayList<>();
+forgottenTopics.forEach(forgottenTopic ->
+forgottenTopic.partitions().forEach(partitionId -> {
+String name = topicNames.get(forgottenTopic.topicId());
+if (name != null)
+result.add(new TopicPartition(forgottenTopic.topic(), 
partitionId));
+})
+);
+return result;
+}
+
+// Only used when Fetch is version 13 or greater.
+private ToForgetAndIds 
toForgottenTopicListAndIds(List 
forgottenTopics, Map topicNames) {
+List result = new ArrayList<>();
+Map> unresolvedIds = new HashMap<>();
+forgottenTopics.forEach(forgottenTopic -> {
+Set partitions = new HashSet<>();
+forgottenTopic.partitions().forEach(partitionId -> {
+String name = topicNames.get(forgottenTopic.topicId());
+if (name != null)
+result.add(new TopicPartition(forgottenTopic.topic(), 
partitionId));
+else
+partitions.add(partitionId);
+});
+if (unresolvedIds.containsKey(forgottenTopic.topicId())) {
+unresolvedIds.get(forgottenTopic.topicId()).addAll(partitions);
+} else {
+unresolvedIds.put(forgottenTopic.topicId(), partitions);

Review comment:
   Ah. This is confusing due to how I named things. Basically, I'm 
collecting a set of partitions `partitions` for a given topic where the ID was 
not resolved. Then I'm adding them to unresolvedIds. This is a mapping from the 
topic ID to all the partitions that should be forgotten. I can rename and add 
comments to clarify what is happening here.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-02-08 Thread GitBox


jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r572377972



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##
@@ -133,6 +200,71 @@ public String toString() {
 return Collections.unmodifiableMap(result);
 }
 
+private Map 
toPartitionDataMap(List fetchableTopics, Map topicNames) {
+Map result = new LinkedHashMap<>();
+fetchableTopics.forEach(fetchTopic -> {
+String name = topicNames.get(fetchTopic.topicId());
+if (name != null) {
+fetchTopic.partitions().forEach(fetchPartition ->
+result.put(new TopicPartition(name, 
fetchPartition.partition()),
+new PartitionData(
+fetchPartition.fetchOffset(),
+fetchPartition.logStartOffset(),
+fetchPartition.partitionMaxBytes(),
+
optionalEpoch(fetchPartition.currentLeaderEpoch()),
+
optionalEpoch(fetchPartition.lastFetchedEpoch())
+)
+)
+);
+}
+});
+return Collections.unmodifiableMap(result);
+}
+
+// Only used when Fetch is version 13 or greater.
+private FetchDataAndError 
toPartitionDataMapAndError(List fetchableTopics, 
Map topicNames) {
+Map fetchData = new LinkedHashMap<>();
+List unresolvedPartitions = new LinkedList<>();
+Map idErrors = new HashMap<>();
+Errors error;
+if (topicNames.isEmpty()) {
+error = Errors.UNSUPPORTED_VERSION;
+} else {
+error = Errors.UNKNOWN_TOPIC_ID;
+}
+fetchableTopics.forEach(fetchTopic -> {
+String name = topicNames.get(fetchTopic.topicId());
+if (name != null) {
+fetchTopic.partitions().forEach(fetchPartition ->
+fetchData.put(new TopicPartition(name, 
fetchPartition.partition()),
+new PartitionData(
+fetchPartition.fetchOffset(),
+fetchPartition.logStartOffset(),
+fetchPartition.partitionMaxBytes(),
+
optionalEpoch(fetchPartition.currentLeaderEpoch()),
+
optionalEpoch(fetchPartition.lastFetchedEpoch())
+)
+)
+);
+} else {
+unresolvedPartitions.add(new 
UnresolvedPartitions(fetchTopic.topicId(), 
fetchTopic.partitions().stream().collect(Collectors.toMap(
+FetchRequestData.FetchPartition::partition, 
fetchPartition -> new PartitionData(
+fetchPartition.fetchOffset(),
+fetchPartition.logStartOffset(),
+fetchPartition.partitionMaxBytes(),
+optionalEpoch(fetchPartition.currentLeaderEpoch()),
+optionalEpoch(fetchPartition.lastFetchedEpoch()));
+
+if (idErrors.containsKey(fetchTopic.topicId()))
+
idErrors.get(fetchTopic.topicId()).addPartitions(fetchTopic.partitions().stream().map(part
 -> part.partition()).collect(Collectors.toList()));

Review comment:
   I realize this is a bit confusing. addPartitions method takes a list
   What this line is doing is grabbing the idError object and adding partitions 
to it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-02-08 Thread GitBox


jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r572373623



##
File path: core/src/main/scala/kafka/server/FetchSession.scala
##
@@ -425,16 +598,26 @@ class IncrementalFetchContext(private val time: Time,
 val topicPart = element.getKey
 val respData = element.getValue
 val cachedPart = session.partitionMap.find(new 
CachedPartition(topicPart))
-val mustRespond = cachedPart.maybeUpdateResponseData(respData, 
updateFetchContextAndRemoveUnselected)
-if (mustRespond) {
+
+if (cachedPart.topicId == Uuid.ZERO_UUID)
+  cachedPart.addId(topicIds.getOrDefault(topicPart.topic, 
Uuid.ZERO_UUID))
+
+if (cachedPart.topicId != topicIds.getOrDefault(topicPart.topic, 
Uuid.ZERO_UUID)) {
   nextElement = element
-  if (updateFetchContextAndRemoveUnselected) {
-session.partitionMap.remove(cachedPart)
-session.partitionMap.mustAdd(cachedPart)
-  }
+  session.partitionMap.remove(cachedPart)
+  iter.remove()

Review comment:
   If we run into this scenario, does it make sense to always return with 
an UNKNOWN_TOPIC_ID error? Sometimes partitions will be skipped over anyway 
when `mustRespond` is false, so should those also return UNKNOWN_TOPIC_ID?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-02-08 Thread GitBox


jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r572370494



##
File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
##
@@ -216,10 +217,10 @@ class ReplicaFetcherThread(name: String,
 try {
   val clientResponse = leaderEndpoint.sendRequest(fetchRequest)
   val fetchResponse = 
clientResponse.responseBody.asInstanceOf[FetchResponse[Records]]
-  if (!fetchSessionHandler.handleResponse(fetchResponse)) {
+  if (!fetchSessionHandler.handleResponse(fetchResponse, 
fetchRequest.latestAllowedVersion())) {

Review comment:
   Good point, it should probably be along the lines of  `if 
(fetchRequestVersion >= 13 && !fetchData.canUseTopicIds) 12 else 
fetchRequestVersion` to match how it is sent below.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-02-08 Thread GitBox


jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r572367849



##
File path: core/src/main/scala/kafka/server/FetchSession.scala
##
@@ -237,14 +317,80 @@ class FetchSession(val id: Int,
   type TL = util.ArrayList[TopicPartition]
 
   // Update the cached partition data based on the request.
-  def update(fetchData: FetchSession.REQ_MAP,
- toForget: util.List[TopicPartition],
- reqMetadata: JFetchMetadata): (TL, TL, TL) = synchronized {
+  def update(version: Short,
+ fetchDataAndError: FetchDataAndError,
+ toForgetAndIds: ToForgetAndIds,
+ reqMetadata: JFetchMetadata,
+ topicIds: util.Map[String, Uuid],
+ topicNames: util.Map[Uuid, String]): (TL, TL, TL) = synchronized {
 val added = new TL
 val updated = new TL
 val removed = new TL
-fetchData.forEach { (topicPart, reqData) =>
-  val newCachedPart = new CachedPartition(topicPart, reqData)
+
+// Only make changes to topic IDs if we have a new request version.
+// If we receive an old request version, ignore all topic ID code, keep 
IDs that are there.
+if (version >= 13) {
+  val error = if (topicNames.isEmpty) Errors.UNSUPPORTED_VERSION else 
Errors.UNKNOWN_TOPIC_ID
+  val unresolvedIterator = unresolvedPartitions.iterator()
+  while (unresolvedIterator.hasNext()) {
+val partition = unresolvedIterator.next()
+
+// Remove from unresolvedPartitions if ID is unresolved in toForgetIds
+val forgetPartitions = 
toForgetAndIds.toForgetIds.get(partition.topicId)
+if (forgetPartitions != null && 
forgetPartitions.contains(partition.partition))
+  unresolvedIterator.remove()
+
+// Try to resolve ID, if there is a name for the given ID, place a 
CachedPartition in partitionMap
+// and remove from unresolvedPartitions.
+else if (topicNames.get(partition.topicId) != null) {
+  val newTp = new TopicPartition(topicNames.get(partition.topicId), 
partition.partition)
+  val newCp = new CachedPartition(newTp, partition.topicId, 
partition.reqData)
+  partitionMap.add(newCp)
+  added.add(newTp)
+  unresolvedIterator.remove()
+} else {
+  val idError = fetchDataAndError.idErrors.get(partition.topicId)

Review comment:
   This is for adding unresolved partitions in the session but not in the 
request. I can add comments to clarify what is happening.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-02-08 Thread GitBox


jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r572363402



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##
@@ -270,24 +314,40 @@ public T records() {
  */
 public FetchResponse(Errors error,
  LinkedHashMap> 
responseData,
+ List idErrors,
+ Map topicIds,
  int throttleTimeMs,
  int sessionId) {
 super(ApiKeys.FETCH);
-this.data = toMessage(throttleTimeMs, error, 
responseData.entrySet().iterator(), sessionId);
+this.data = toMessage(throttleTimeMs, error, 
responseData.entrySet().iterator(), idErrors, topicIds, sessionId);
 this.responseDataMap = responseData;
 }
 
 public FetchResponse(FetchResponseData fetchResponseData) {
 super(ApiKeys.FETCH);
 this.data = fetchResponseData;
-this.responseDataMap = toResponseDataMap(fetchResponseData);
+if (!supportsTopicIds()) {
+this.responseDataMap = toResponseDataMap(fetchResponseData);
+} else {
+this.responseDataMap = null;
+}
 }
 
 public Errors error() {
 return Errors.forCode(data.errorCode());
 }
 
-public LinkedHashMap> responseData() {
+public LinkedHashMap> 
responseData(Map topicNames) {
+if (!supportsTopicIds())
+return responseDataMap;
+return toResponseDataMap(data, topicNames);
+
+}
+
+// Used when we can guarantee responseData is populated with all possible 
partitions
+// This occurs when we have a response version < 13 or we built the 
FetchResponse with
+// responseDataMap as a parameter and we have the same topic IDs available.
+public LinkedHashMap> 
resolvedResponseData() {

Review comment:
   I agree. I think it stems from exactly what you said...that  
`FetchContext.updateAndGenerateResponseData()` generates a response only for it 
to be generated again. Currently ` 
FetchContext.updateAndGenerateResponseData()` does include all partitions 
(resolved and unresolved). The issue is that the partitions need to be 
down-converted. The way this works is that the partitions are pulled from the 
FetchResponse object itself. However, the issue is that I've changed 
responseData and since this is the newest version of the response, it will try 
to reconstruct the map instead of pulling the object `partitionData`. (Which is 
too slow) I thought about changing the method to always return the map when it 
is not null, but that caused some issues in some places as well. I can look 
into this again though.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-02-08 Thread GitBox


jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r572363402



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##
@@ -270,24 +314,40 @@ public T records() {
  */
 public FetchResponse(Errors error,
  LinkedHashMap> 
responseData,
+ List idErrors,
+ Map topicIds,
  int throttleTimeMs,
  int sessionId) {
 super(ApiKeys.FETCH);
-this.data = toMessage(throttleTimeMs, error, 
responseData.entrySet().iterator(), sessionId);
+this.data = toMessage(throttleTimeMs, error, 
responseData.entrySet().iterator(), idErrors, topicIds, sessionId);
 this.responseDataMap = responseData;
 }
 
 public FetchResponse(FetchResponseData fetchResponseData) {
 super(ApiKeys.FETCH);
 this.data = fetchResponseData;
-this.responseDataMap = toResponseDataMap(fetchResponseData);
+if (!supportsTopicIds()) {
+this.responseDataMap = toResponseDataMap(fetchResponseData);
+} else {
+this.responseDataMap = null;
+}
 }
 
 public Errors error() {
 return Errors.forCode(data.errorCode());
 }
 
-public LinkedHashMap> responseData() {
+public LinkedHashMap> 
responseData(Map topicNames) {
+if (!supportsTopicIds())
+return responseDataMap;
+return toResponseDataMap(data, topicNames);
+
+}
+
+// Used when we can guarantee responseData is populated with all possible 
partitions
+// This occurs when we have a response version < 13 or we built the 
FetchResponse with
+// responseDataMap as a parameter and we have the same topic IDs available.
+public LinkedHashMap> 
resolvedResponseData() {

Review comment:
   I agree. I think it stems from exactly what you said...that  
`FetchContext.updateAndGenerateResponseData()` generates a response only for it 
to be generated again. Currently ` 
FetchContext.updateAndGenerateResponseData()` does include all partitions 
(resolved and unresolved). The issue is that the partitions need to be 
down-converted. The way this works is that the partitions are pulled from the 
FetchResponse object itself. However, the issue is that I've changed 
responseData and since this is the newest version of the response, it will try 
to reconstruct the map instead of pulling the object `partitionData` I thought 
about changing the method to always return the map when it is not null, but 
that caused some issues in some places as well. I can look into this again 
though.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-02-08 Thread GitBox


jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r572357857



##
File path: core/src/main/scala/kafka/server/FetchSession.scala
##
@@ -237,14 +317,80 @@ class FetchSession(val id: Int,
   type TL = util.ArrayList[TopicPartition]
 
   // Update the cached partition data based on the request.
-  def update(fetchData: FetchSession.REQ_MAP,
- toForget: util.List[TopicPartition],
- reqMetadata: JFetchMetadata): (TL, TL, TL) = synchronized {
+  def update(version: Short,
+ fetchDataAndError: FetchDataAndError,
+ toForgetAndIds: ToForgetAndIds,
+ reqMetadata: JFetchMetadata,
+ topicIds: util.Map[String, Uuid],
+ topicNames: util.Map[Uuid, String]): (TL, TL, TL) = synchronized {
 val added = new TL
 val updated = new TL
 val removed = new TL
-fetchData.forEach { (topicPart, reqData) =>
-  val newCachedPart = new CachedPartition(topicPart, reqData)
+
+// Only make changes to topic IDs if we have a new request version.
+// If we receive an old request version, ignore all topic ID code, keep 
IDs that are there.
+if (version >= 13) {
+  val error = if (topicNames.isEmpty) Errors.UNSUPPORTED_VERSION else 
Errors.UNKNOWN_TOPIC_ID

Review comment:
   I've gone back and forth on this. One one hand, you are right that this 
is confusing in the case where we are doing and upgrade and ID propagation is 
delayed. On the other hand, in the non-upgrade case, returning an 
UNKNOWN_TOPIC_ID error when topic IDs are not even supported might not be as 
informative.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe merged pull request #10043: MINOR: Add StorageTool as specified in KIP-631

2021-02-08 Thread GitBox


cmccabe merged pull request #10043:
URL: https://github.com/apache/kafka/pull/10043


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #10043: MINOR: Add StorageTool as specified in KIP-631

2021-02-08 Thread GitBox


mumrah commented on a change in pull request #10043:
URL: https://github.com/apache/kafka/pull/10043#discussion_r572357477



##
File path: core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
##
@@ -88,9 +88,9 @@ class RawMetaProperties(val props: Properties = new 
Properties()) {
   }
 
   override def toString: String = {
-"RawMetaProperties(" + 
props.keySet().asScala.toList.asInstanceOf[List[String]].sorted.map {
+"{" + props.keySet().asScala.toList.asInstanceOf[List[String]].sorted.map {

Review comment:
   ok, sounds good





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-02-08 Thread GitBox


jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r572356813



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##
@@ -110,7 +116,68 @@ public String toString() {
 }
 }
 
-private Optional optionalEpoch(int rawEpochValue) {
+public static final class UnresolvedPartitions {

Review comment:
   The reason I name it this is we maintain such an object for each 
partition that was unresolved. If we simply have one object per topic, we would 
need a way to know all the partitions for the topic that were requested.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-02-08 Thread GitBox


jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r572355958



##
File path: 
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##
@@ -265,9 +353,32 @@ public FetchRequestData build() {
 Map curSessionPartitions = 
copySessionPartitions
 ? Collections.unmodifiableMap(new 
LinkedHashMap<>(sessionPartitions))
 : Collections.unmodifiableMap(sessionPartitions);
+Map toSendTopicIds =
+Collections.unmodifiableMap(new 
HashMap<>(sessionTopicIds));
+Map toSendTopicNames =
+Collections.unmodifiableMap(new 
HashMap<>(sessionTopicNames));
+boolean canUseTopicIds = sessionPartitionsPerTopic.size() == 
toSendTopicIds.size();
+
 next = null;
+topicIds = null;
+topicNames = null;
+partitionsPerTopic = null;
 return new FetchRequestData(toSend, 
Collections.unmodifiableList(removed),
-curSessionPartitions, nextMetadata);
+curSessionPartitions, toSendTopicIds, toSendTopicNames, 
nextMetadata, canUseTopicIds);
+}
+
+private void addPartitionsAndIds(Map> 
partitionsPerTopic,
+ TopicPartition tp, Uuid id, 
Map topicIds,
+ Map topicNames) {
+if (partitionsPerTopic.containsKey(tp.topic())) {
+partitionsPerTopic.get(tp.topic()).add(tp.partition());
+} else {
+partitionsPerTopic.put(tp.topic(), new 
HashSet<>(tp.partition()));

Review comment:
   Yeah. Good catch. I'm going to experiment with this code a bit to see if 
it's faster to maintain this set or just get a set of topics from the map of 
topic partitions in` FetchRequestData`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-02-08 Thread GitBox


jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r572354491



##
File path: 
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##
@@ -73,6 +76,25 @@ public FetchSessionHandler(LogContext logContext, int node) {
 private LinkedHashMap sessionPartitions =
 new LinkedHashMap<>(0);
 
+/**
+ * All of the topic ids mapped to topic names for topics which exist in 
the fetch request session.
+ */
+private Map sessionTopicIds = new HashMap<>(0);

Review comment:
   I think I was just matching `sessionPartitions` above





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-02-08 Thread GitBox


jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r572351945



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##
@@ -270,24 +314,40 @@ public T records() {
  */
 public FetchResponse(Errors error,
  LinkedHashMap> 
responseData,
+ List idErrors,

Review comment:
   Sounds good to me
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-02-08 Thread GitBox


jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r572351801



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##
@@ -259,8 +262,49 @@ public T records() {
 }
 }
 
+public static final class IdError {
+private final Uuid id;
+private final Set partitions;
+private final Errors error;
+
+public IdError(Uuid id,
+List partitions,
+Errors error) {
+this.id = id;
+this.partitions = new HashSet<>(partitions);
+this.error = error;
+}
+
+public Uuid id() {
+return this.id;
+}
+
+public Set partitions() {
+return this.partitions;
+}
+
+public void addPartitions(List partitions) {
+partitions.forEach(partition -> {
+partitions.add(partition);
+});
+}
+
+private List errorData() 
{
+return partitions.stream().map(partition -> new 
FetchResponseData.FetchablePartitionResponse()
+.setPartition(partition)
+.setErrorCode(error.code())
+.setHighWatermark(FetchResponse.INVALID_HIGHWATERMARK)
+
.setLastStableOffset(FetchResponse.INVALID_LAST_STABLE_OFFSET)
+.setLogStartOffset(FetchResponse.INVALID_LOG_START_OFFSET)
+.setAbortedTransactions(null)
+
.setPreferredReadReplica(FetchResponse.INVALID_PREFERRED_REPLICA_ID)
+
.setRecordSet(MemoryRecords.EMPTY)).collect(Collectors.toList());
+}
+
+}
+
 /**
- * From version 3 or later, the entries in `responseData` should be in the 
same order as the entries in
+ * From version 3 or later, the 'interesting' entries in `responseData` 
should be in the same order as the entries in

Review comment:
   'interesting' was the name of the map of partitionData. I believe they 
are topic partitions that are authorized and exist.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-02-08 Thread GitBox


jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r572350818



##
File path: core/src/main/scala/kafka/server/FetchSession.scala
##
@@ -237,14 +317,80 @@ class FetchSession(val id: Int,
   type TL = util.ArrayList[TopicPartition]
 
   // Update the cached partition data based on the request.
-  def update(fetchData: FetchSession.REQ_MAP,
- toForget: util.List[TopicPartition],
- reqMetadata: JFetchMetadata): (TL, TL, TL) = synchronized {
+  def update(version: Short,
+ fetchDataAndError: FetchDataAndError,
+ toForgetAndIds: ToForgetAndIds,
+ reqMetadata: JFetchMetadata,
+ topicIds: util.Map[String, Uuid],
+ topicNames: util.Map[Uuid, String]): (TL, TL, TL) = synchronized {
 val added = new TL
 val updated = new TL
 val removed = new TL
-fetchData.forEach { (topicPart, reqData) =>
-  val newCachedPart = new CachedPartition(topicPart, reqData)
+
+// Only make changes to topic IDs if we have a new request version.

Review comment:
   Sorry this was unclear. I meant changes involving topic IDs. I will 
adjust this comment.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10728) Mirroring data without decompressing with MirrorMaker 2.0

2021-02-08 Thread Henry Cai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17281352#comment-17281352
 ] 

Henry Cai commented on KAFKA-10728:
---

Please take a look at 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-712%3A+Shallow+Mirroring,]
 this should help.

> Mirroring data without decompressing with MirrorMaker 2.0
> -
>
> Key: KAFKA-10728
> URL: https://issues.apache.org/jira/browse/KAFKA-10728
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Reporter: Eazhilan Nagarajan
>Priority: Major
>
> Hello, 
>  
> I use MirrorMaker 2.0 to copy data across two Kafka clusters and it's all 
> working fine. Recently we enabled compressing while producing data into any 
> topic which had a very positive impact on the storage and other resources but 
> while mirroring, the data seems to be decompressed at the target Kafka 
> cluster. I tried enabling compression using the below config in MM2, the data 
> at the target cluster is compressed now, the decompress and re-compress 
> continues to happen and it eats up a lot of resources unnecessarily.
>  
> {noformat}
> - alias: my-passive-cluster
> authentication:
>   passwordSecret:
> password: password
> secretName: passive-cluster-secret
>   type: scram-sha-512
>   username: user-1
> bootstrapServers: my-passive-cluster.com:443
> config:
>   config.storage.replication.factor: 3
>   offset.storage.replication.factor: 3
>   status.storage.replication.factor: 3
>   producer.compression.type: gzip{noformat}
>  I found couple of Jira issues talking about it but I don't know if the 
> shallow iterator option is available now.
> https://issues.apache.org/jira/browse/KAFKA-732, 
> https://issues.apache.org/jira/browse/KAFKA-845
>  
> Kindly let me if this is currently available or if it'll be available in the 
> future.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe commented on a change in pull request #10043: MINOR: Add StorageTool as specified in KIP-631

2021-02-08 Thread GitBox


cmccabe commented on a change in pull request #10043:
URL: https://github.com/apache/kafka/pull/10043#discussion_r572340541



##
File path: core/src/main/scala/kafka/tools/StorageTool.scala
##
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.tools
+
+import java.io.PrintStream
+import java.nio.file.{Files, Paths}
+
+import kafka.server.{BrokerMetadataCheckpoint, KafkaConfig, MetaProperties, 
RawMetaProperties}
+import kafka.utils.{Exit, Logging}
+import net.sourceforge.argparse4j.ArgumentParsers
+import net.sourceforge.argparse4j.impl.Arguments.{store, storeTrue}
+import org.apache.kafka.common.Uuid
+import org.apache.kafka.common.utils.Utils
+
+import scala.collection.mutable
+
+object StorageTool extends Logging {
+  def main(args: Array[String]): Unit = {
+try {
+  val parser = ArgumentParsers.
+newArgumentParser("kafka-storage").
+defaultHelp(true).
+description("The Kafka storage tool.")
+  val subparsers = parser.addSubparsers().dest("command")
+
+  val infoParser = subparsers.addParser("info").
+help("Get information about the Kafka log directories on this node.")
+  val formatParser = subparsers.addParser("format").
+help("Format the Kafka log directories on this node.")
+  subparsers.addParser("random-uuid").help("Print a random UUID.")
+  List(infoParser, formatParser).foreach(parser => {
+parser.addArgument("--config", "-c").
+  action(store()).
+  required(true).
+  help("The Kafka configuration file to use.")
+  })
+  formatParser.addArgument("--cluster-id", "-t").
+action(store()).
+required(true).
+help("The cluster ID to use.")
+  formatParser.addArgument("--ignore-formatted", "-g").
+action(storeTrue())
+
+  val namespace = parser.parseArgsOrFail(args)
+  val command = namespace.getString("command")
+  val config = Option(namespace.getString("config")).flatMap(
+p => Some(new KafkaConfig(Utils.loadProps(p
+
+  command match {
+case "info" =>
+  val directories = configToLogDirectories(config.get)
+  val kip500Mode = configToKip500Mode(config.get)
+  Exit.exit(infoCommand(System.out, kip500Mode, directories))
+
+case "format" =>
+  val directories = configToLogDirectories(config.get)
+  val clusterId = namespace.getString("cluster_id")
+  val metaProperties = buildMetadataProperties(clusterId, config.get)
+  val ignoreFormatted = namespace.getBoolean("ignore_formatted")
+  if (!configToKip500Mode(config.get)) {
+throw new TerseFailure("The kafka configuration file appears to be 
for " +
+  "a legacy cluster. Formatting is only supported for kip-500 
clusters.")
+  }
+  Exit.exit(formatCommand(System.out, directories, metaProperties, 
ignoreFormatted ))
+
+case "random-uuid" =>
+  System.out.println(Uuid.randomUuid)
+  Exit.exit(0)
+
+case _ =>
+  throw new RuntimeException(s"Unknown command $command")
+  }
+} catch {
+  case e: TerseFailure =>
+System.err.println(e.getMessage)
+System.exit(1)
+}
+  }
+
+  def configToLogDirectories(config: KafkaConfig): Seq[String] = {
+val directories = new mutable.TreeSet[String]
+directories ++= config.logDirs
+Option(config.metadataLogDir).foreach(directories.add)
+directories.toSeq
+  }
+
+  def configToKip500Mode(config: KafkaConfig): Boolean = 
config.processRoles.nonEmpty
+
+  def infoCommand(stream: PrintStream, kip500Mode: Boolean, directories: 
Seq[String]): Int = {
+val problems = new mutable.ArrayBuffer[String]
+val foundDirectories = new mutable.ArrayBuffer[String]
+var prevMetadata: Option[RawMetaProperties] = None
+directories.sorted.foreach(directory => {
+  val directoryPath = Paths.get(directory)
+  if (!Files.isDirectory(directoryPath)) {
+if (!Files.exists(directoryPath)) {
+  problems += s"$directoryPath does not exist"
+} else {
+  problems += s"$directoryPath is not a directory"
+}
+  } 

[GitHub] [kafka] cmccabe commented on a change in pull request #10043: MINOR: Add StorageTool as specified in KIP-631

2021-02-08 Thread GitBox


cmccabe commented on a change in pull request #10043:
URL: https://github.com/apache/kafka/pull/10043#discussion_r572340025



##
File path: core/src/main/scala/kafka/tools/StorageTool.scala
##
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.tools
+
+import java.io.PrintStream
+import java.nio.file.{Files, Paths}
+
+import kafka.server.{BrokerMetadataCheckpoint, KafkaConfig, MetaProperties, 
RawMetaProperties}
+import kafka.utils.{Exit, Logging}
+import net.sourceforge.argparse4j.ArgumentParsers
+import net.sourceforge.argparse4j.impl.Arguments.{store, storeTrue}
+import org.apache.kafka.common.Uuid
+import org.apache.kafka.common.utils.Utils
+
+import scala.collection.mutable
+
+object StorageTool extends Logging {
+  def main(args: Array[String]): Unit = {
+try {
+  val parser = ArgumentParsers.
+newArgumentParser("kafka-storage").
+defaultHelp(true).
+description("The Kafka storage tool.")
+  val subparsers = parser.addSubparsers().dest("command")
+
+  val infoParser = subparsers.addParser("info").
+help("Get information about the Kafka log directories on this node.")
+  val formatParser = subparsers.addParser("format").
+help("Format the Kafka log directories on this node.")
+  subparsers.addParser("random-uuid").help("Print a random UUID.")
+  List(infoParser, formatParser).foreach(parser => {
+parser.addArgument("--config", "-c").
+  action(store()).
+  required(true).
+  help("The Kafka configuration file to use.")
+  })
+  formatParser.addArgument("--cluster-id", "-t").
+action(store()).
+required(true).
+help("The cluster ID to use.")
+  formatParser.addArgument("--ignore-formatted", "-g").
+action(storeTrue())
+
+  val namespace = parser.parseArgsOrFail(args)
+  val command = namespace.getString("command")
+  val config = Option(namespace.getString("config")).flatMap(
+p => Some(new KafkaConfig(Utils.loadProps(p
+
+  command match {
+case "info" =>
+  val directories = configToLogDirectories(config.get)
+  val kip500Mode = configToKip500Mode(config.get)
+  Exit.exit(infoCommand(System.out, kip500Mode, directories))
+
+case "format" =>
+  val directories = configToLogDirectories(config.get)
+  val clusterId = namespace.getString("cluster_id")
+  val metaProperties = buildMetadataProperties(clusterId, config.get)
+  val ignoreFormatted = namespace.getBoolean("ignore_formatted")
+  if (!configToKip500Mode(config.get)) {
+throw new TerseFailure("The kafka configuration file appears to be 
for " +
+  "a legacy cluster. Formatting is only supported for kip-500 
clusters.")
+  }
+  Exit.exit(formatCommand(System.out, directories, metaProperties, 
ignoreFormatted ))
+
+case "random-uuid" =>
+  System.out.println(Uuid.randomUuid)
+  Exit.exit(0)
+
+case _ =>
+  throw new RuntimeException(s"Unknown command $command")
+  }
+} catch {
+  case e: TerseFailure =>
+System.err.println(e.getMessage)
+System.exit(1)
+}
+  }
+
+  def configToLogDirectories(config: KafkaConfig): Seq[String] = {
+val directories = new mutable.TreeSet[String]
+directories ++= config.logDirs
+Option(config.metadataLogDir).foreach(directories.add)
+directories.toSeq
+  }
+
+  def configToKip500Mode(config: KafkaConfig): Boolean = 
config.processRoles.nonEmpty
+
+  def infoCommand(stream: PrintStream, kip500Mode: Boolean, directories: 
Seq[String]): Int = {
+val problems = new mutable.ArrayBuffer[String]
+val foundDirectories = new mutable.ArrayBuffer[String]
+var prevMetadata: Option[RawMetaProperties] = None
+directories.sorted.foreach(directory => {
+  val directoryPath = Paths.get(directory)
+  if (!Files.isDirectory(directoryPath)) {
+if (!Files.exists(directoryPath)) {
+  problems += s"$directoryPath does not exist"
+} else {
+  problems += s"$directoryPath is not a directory"
+}
+  } 

[GitHub] [kafka] cmccabe commented on a change in pull request #10043: MINOR: Add StorageTool as specified in KIP-631

2021-02-08 Thread GitBox


cmccabe commented on a change in pull request #10043:
URL: https://github.com/apache/kafka/pull/10043#discussion_r572338881



##
File path: core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
##
@@ -88,9 +88,9 @@ class RawMetaProperties(val props: Properties = new 
Properties()) {
   }
 
   override def toString: String = {
-"RawMetaProperties(" + 
props.keySet().asScala.toList.asInstanceOf[List[String]].sorted.map {
+"{" + props.keySet().asScala.toList.asInstanceOf[List[String]].sorted.map {

Review comment:
   It doesn't produce JSON (for example, keys and values are not quoted).
   
   This is intended for human consumption.  It will show up in the command-line 
output.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #10047: MINOR: Add ClusterTool as specified in KIP-631

2021-02-08 Thread GitBox


cmccabe commented on pull request #10047:
URL: https://github.com/apache/kafka/pull/10047#issuecomment-775412214


   Committed to 2.8



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe merged pull request #10047: MINOR: Add ClusterTool as specified in KIP-631

2021-02-08 Thread GitBox


cmccabe merged pull request #10047:
URL: https://github.com/apache/kafka/pull/10047


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-02-08 Thread GitBox


junrao commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r568140154



##
File path: core/src/main/scala/kafka/server/FetchSession.scala
##
@@ -37,7 +38,10 @@ object FetchSession {
   type REQ_MAP = util.Map[TopicPartition, FetchRequest.PartitionData]
   type RESP_MAP = util.LinkedHashMap[TopicPartition, 
FetchResponse.PartitionData[Records]]
   type CACHE_MAP = ImplicitLinkedHashCollection[CachedPartition]
+  type UNRESOLVED_CACHE = util.HashSet[CachedUnresolvedPartition]
   type RESP_MAP_ITER = util.Iterator[util.Map.Entry[TopicPartition, 
FetchResponse.PartitionData[Records]]]
+  type TOPIC_ID_MAP = util.Map[String,Uuid]

Review comment:
   space after comma

##
File path: core/src/main/scala/kafka/server/FetchSession.scala
##
@@ -187,24 +205,86 @@ class CachedPartition(val topic: String,
   }
 }
 
+/**
+ * Very similar to CachedPartition above, CachedUnresolvedPartition is used 
for incremental fetch requests.
+ * These objects store partitions that had topic IDs that could not be 
resolved by the broker.
+ *
+ * Upon each incremental request in the session, these partitions will be 
loaded. They can either be removed
+ * through resolving the partition with the broker's topicNames map or by 
receiving an unresolved toForget ID.
+ *
+ * Since these partitions signify an error, they will always be returned in 
the response.
+ */
+
+class CachedUnresolvedPartition(val topicId: Uuid,
+  val partition: Int,
+  var maxBytes: Int,
+  var fetchOffset: Long,
+  var leaderEpoch: Optional[Integer],
+  var fetcherLogStartOffset: Long,
+  var lastFetchedEpoch: Optional[Integer]) {
+
+  def this(id: Uuid, partition: Int) =
+this(id, partition, -1, -1, Optional.empty(), -1, Optional.empty[Integer])
+
+  def this(id: Uuid, partition: Int, reqData: FetchRequest.PartitionData) =
+this(id, partition, reqData.maxBytes, reqData.fetchOffset,
+  reqData.currentLeaderEpoch, reqData.logStartOffset, 
reqData.lastFetchedEpoch)
+
+  def reqData = new FetchRequest.PartitionData(fetchOffset, 
fetcherLogStartOffset, maxBytes, leaderEpoch, lastFetchedEpoch)
+
+  def updateRequestParams(reqData: FetchRequest.PartitionData): Unit = {
+// Update our cached request parameters.
+maxBytes = reqData.maxBytes
+fetchOffset = reqData.fetchOffset
+fetcherLogStartOffset = reqData.logStartOffset
+leaderEpoch = reqData.currentLeaderEpoch
+lastFetchedEpoch = reqData.lastFetchedEpoch
+  }
+
+  override def hashCode: Int = (31 * partition) + topicId.hashCode
+
+  def canEqual(that: Any) = that.isInstanceOf[CachedUnresolvedPartition]
+
+  override def equals(that: Any): Boolean =
+that match {
+  case that: CachedUnresolvedPartition =>
+this.eq(that) ||
+  (that.canEqual(this) &&
+this.partition.equals(that.partition) &&
+this.topicId.equals(that.topicId))
+  case _ => false
+}
+
+  override def toString: String = synchronized {
+"CachedPartition(Id=" + topicId +
+  ", partition=" + partition +
+  ", maxBytes=" + maxBytes +
+  ", fetchOffset=" + fetchOffset +
+  ", fetcherLogStartOffset=" + fetcherLogStartOffset +
+  ")"
+  }
+}
+
 /**
   * The fetch session.
   *
   * Each fetch session is protected by its own lock, which must be taken 
before mutable
   * fields are read or modified.  This includes modification of the session 
partition map.
   *
-  * @param id   The unique fetch session ID.
-  * @param privileged   True if this session is privileged.  Sessions crated 
by followers
-  * are privileged; sesssion created by consumers are not.
-  * @param partitionMap The CachedPartitionMap.
-  * @param creationMs   The time in milliseconds when this session was created.
-  * @param lastUsedMs   The last used time in milliseconds.  This should only 
be updated by
-  * FetchSessionCache#touch.
-  * @param epochThe fetch session sequence number.
+  * @param id The unique fetch session ID.
+  * @param privileged True if this session is privileged.  
Sessions crated by followers
+  *   are privileged; sesssion created by 
consumers are not.
+  * @param partitionMap   The CachedPartitionMap.
+ *  @param unresolvedPartitions   The CachedUnresolvedPartitionMap
+  * @param creationMs The time in milliseconds when this session 
was created.
+  * @param lastUsedMs The last used time in milliseconds.  This 
should only be updated by
+  *   FetchSessionCache#touch.
+  * @param epoch  The fetch session sequence number.
   */
 class FetchSession(val id: Int,
val privileged: Boolean,
val partitionMap: 

[GitHub] [kafka] mumrah commented on a change in pull request #9986: JUnit extensions for integration tests

2021-02-08 Thread GitBox


mumrah commented on a change in pull request #9986:
URL: https://github.com/apache/kafka/pull/9986#discussion_r572310749



##
File path: core/src/test/java/kafka/test/ClusterInstance.java
##
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test;
+
+import kafka.network.SocketServer;
+import kafka.test.annotation.ClusterTest;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.common.network.ListenerName;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.Properties;
+
+public interface ClusterInstance {
+
+enum ClusterType {
+ZK,
+// Raft
+}
+
+/**
+ * Cluster type. For now, only ZK is supported.
+ */
+ClusterType clusterType();
+
+/**
+ * The cluster configuration used to create this cluster. Changing data in 
this instance through this accessor will
+ * have no affect on the cluster since it is already provisioned.
+ */
+ClusterConfig config();
+
+/**
+ * The listener for this cluster as configured by {@link ClusterTest} or 
by {@link ClusterConfig}. If
+ * unspecified by those sources, this will return the listener for the 
default security protocol PLAINTEXT
+ */
+ListenerName listener();
+
+/**
+ * The broker connect string which can be used by clients for bootstrapping
+ */
+String brokerList();
+
+/**
+ * A collection of all brokers in the cluster. In ZK-based clusters this 
will also include the broker which is
+ * acting as the controller (since ZK controllers serve both broker and 
controller roles).
+ */
+Collection brokerSocketServers();
+
+/**
+ * A collection of all controllers in the cluster. For ZK-based clusters, 
this will return the broker which is also
+ * currently the active controller. For Raft-based clusters, this will 
return all controller servers.
+ */
+Collection controllerSocketServers();
+
+/**
+ * Any one of the broker servers.
+ */
+Optional anyBrokerSocketServer();

Review comment:
   Yea we can ditch the Optional for these





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9986: JUnit extensions for integration tests

2021-02-08 Thread GitBox


cmccabe commented on a change in pull request #9986:
URL: https://github.com/apache/kafka/pull/9986#discussion_r572310383



##
File path: 
core/src/test/scala/integration/kafka/server/IntegrationTestHelper.scala
##
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package integration.kafka.server
+
+import kafka.network.SocketServer
+import kafka.utils.{NotNothing, TestUtils}
+import org.apache.kafka.common.network.{ListenerName, Mode}
+import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, 
RequestHeader, RequestTestUtils, ResponseHeader}
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.utils.Utils
+
+import java.io.{DataInputStream, DataOutputStream}
+import java.net.Socket
+import java.nio.ByteBuffer
+import java.util.Properties
+import scala.annotation.nowarn
+import scala.reflect.ClassTag
+
+class IntegrationTestHelper {

Review comment:
   These are all static methods, right?  Can we just make this an "object" 
rather than a class and have something like `object IntegrationTestUtils` ?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9986: JUnit extensions for integration tests

2021-02-08 Thread GitBox


cmccabe commented on a change in pull request #9986:
URL: https://github.com/apache/kafka/pull/9986#discussion_r572309452



##
File path: core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
##
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test.junit;
+
+import integration.kafka.server.IntegrationTestHelper;
+import kafka.api.IntegrationTestHarness;
+import kafka.network.SocketServer;
+import kafka.server.KafkaServer;
+import kafka.utils.TestUtils;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import kafka.test.ClusterConfig;
+import kafka.test.ClusterInstance;
+import org.junit.jupiter.api.extension.AfterTestExecutionCallback;
+import org.junit.jupiter.api.extension.BeforeTestExecutionCallback;
+import org.junit.jupiter.api.extension.Extension;
+import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
+import scala.Option;
+import scala.collection.JavaConverters;
+import scala.compat.java8.OptionConverters;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+/**
+ * Wraps a {@link IntegrationTestHarness} inside lifecycle methods for a test 
invocation. Each instance of this
+ * class is provided with a configuration for the cluster.
+ *
+ * This context also provides parameter resolvers for:
+ *
+ * 
+ * ClusterConfig (the same instance passed to the constructor)
+ * ClusterInstance (includes methods to expose underlying 
SocketServer-s)
+ * IntegrationTestHelper (helper methods)
+ * 
+ */
+public class ZkClusterInvocationContext implements 
TestTemplateInvocationContext {
+
+private final ClusterConfig clusterConfig;
+private final AtomicReference clusterReference;
+
+public ZkClusterInvocationContext(ClusterConfig clusterConfig) {
+this.clusterConfig = clusterConfig;
+this.clusterReference = new AtomicReference<>();
+}
+
+@Override
+public String getDisplayName(int invocationIndex) {
+String clusterDesc = clusterConfig.nameTags().entrySet().stream()
+.map(Object::toString)
+.collect(Collectors.joining(", "));
+return String.format("[Zk %d] %s", invocationIndex, clusterDesc);
+}
+
+@Override
+public List getAdditionalExtensions() {
+ClusterInstance clusterShim = new ZkClusterInstance(clusterConfig, 
clusterReference);
+return Arrays.asList(
+(BeforeTestExecutionCallback) context -> {
+// We have to wait to actually create the underlying cluster 
until after our @BeforeEach methods
+// have run. This allows tests to set up external dependencies 
like ZK, MiniKDC, etc.
+// However, since we cannot create this instance until we are 
inside the test invocation, we have
+// to use a container class (AtomicReference) to provide this 
cluster object to the test itself
+
+// This is what tests normally extend from to start a cluster, 
here we create it anonymously and
+// configure the cluster using values from ClusterConfig
+IntegrationTestHarness cluster = new IntegrationTestHarness() {
+@Override
+public Properties serverConfig() {
+return clusterConfig.serverProperties();
+}
+
+@Override
+public Properties adminClientConfig() {
+return clusterConfig.adminClientProperties();
+}
+
+@Override
+public Properties consumerConfig() {
+return clusterConfig.consumerProperties();
+}
+
+@Override
+public Properties producerConfig() {
+return 

[GitHub] [kafka] d8tltanc commented on pull request #9865: Fill in the 2.8 release note for Authorizer

2021-02-08 Thread GitBox


d8tltanc commented on pull request #9865:
URL: https://github.com/apache/kafka/pull/9865#issuecomment-775382411


   Hi @rajinisivaram . Thanks for the comments. I've addressed all of them. 
Could you take a look and approve it? Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9986: JUnit extensions for integration tests

2021-02-08 Thread GitBox


cmccabe commented on a change in pull request #9986:
URL: https://github.com/apache/kafka/pull/9986#discussion_r572291568



##
File path: core/src/test/java/kafka/test/annotation/AutoStart.java
##
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test.annotation;
+
+public enum AutoStart {

Review comment:
   capitalize enum names please.  also, do we need this enum?  Why does 
DEFAULT need to be distinct from YES?  Once we have a lot of tests depending on 
the default being "on" I can't see us changing this (or how having 3 values 
would help us change it if so...)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9986: JUnit extensions for integration tests

2021-02-08 Thread GitBox


cmccabe commented on a change in pull request #9986:
URL: https://github.com/apache/kafka/pull/9986#discussion_r572290904



##
File path: core/src/test/java/kafka/test/ClusterInstance.java
##
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test;
+
+import kafka.network.SocketServer;
+import kafka.test.annotation.ClusterTest;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.common.network.ListenerName;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.Properties;
+
+public interface ClusterInstance {
+
+enum ClusterType {
+Zk,
+// Raft
+}
+
+/**
+ * Cluster type. For now, only ZK is supported.
+ */
+ClusterType clusterType();
+
+/**
+ * The cluster configuration used to create this cluster. Changing data in 
this instance through this accessor will
+ * have no affect on the cluster since it is already provisioned.
+ */
+ClusterConfig config();
+
+/**
+ * The listener for this cluster as configured by {@link ClusterTest} or 
by {@link ClusterConfig}. If
+ * unspecified by those sources, this will return the listener for the 
default security protocol PLAINTEXT
+ */
+ListenerName listener();
+
+/**
+ * The broker connect string which can be used by clients for bootstrapping
+ */
+String brokerList();
+
+/**
+ * A collection of all brokers in the cluster. In ZK-based clusters this 
will also include the broker which is
+ * acting as the controller (since ZK controllers serve both broker and 
controller roles).
+ */
+Collection brokers();
+
+/**
+ * A collection of all controllers in the cluster. For ZK-based clusters, 
this will return the broker which is also
+ * currently the active controller. For Raft-based clusters, this will 
return all controller servers.
+ */
+Collection controllers();
+
+/**
+ * Any one of the broker servers.
+ */
+Optional anyBroker();
+
+/**
+ * Any one of the controller servers.
+ */
+Optional anyController();
+
+/**
+ * The underlying object which is responsible for setting up and tearing 
down the cluster.
+ */
+Object getUnderlying();
+
+default  T getUnderlying(Class asClass) {
+return asClass.cast(getUnderlying());
+}
+
+Admin createAdminClient(Properties configOverrides);
+
+default Admin createAdminClient() {
+return createAdminClient(new Properties());
+}
+
+void start();
+
+void stop();

Review comment:
   ok





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9986: JUnit extensions for integration tests

2021-02-08 Thread GitBox


cmccabe commented on a change in pull request #9986:
URL: https://github.com/apache/kafka/pull/9986#discussion_r572290383



##
File path: core/src/test/java/kafka/test/ClusterInstance.java
##
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test;
+
+import kafka.network.SocketServer;
+import kafka.test.annotation.ClusterTest;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.common.network.ListenerName;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.Properties;
+
+public interface ClusterInstance {
+
+enum ClusterType {
+ZK,
+// Raft
+}
+
+/**
+ * Cluster type. For now, only ZK is supported.
+ */
+ClusterType clusterType();
+
+/**
+ * The cluster configuration used to create this cluster. Changing data in 
this instance through this accessor will
+ * have no affect on the cluster since it is already provisioned.
+ */
+ClusterConfig config();
+
+/**
+ * The listener for this cluster as configured by {@link ClusterTest} or 
by {@link ClusterConfig}. If
+ * unspecified by those sources, this will return the listener for the 
default security protocol PLAINTEXT
+ */
+ListenerName listener();
+
+/**
+ * The broker connect string which can be used by clients for bootstrapping
+ */
+String brokerList();
+
+/**
+ * A collection of all brokers in the cluster. In ZK-based clusters this 
will also include the broker which is
+ * acting as the controller (since ZK controllers serve both broker and 
controller roles).
+ */
+Collection brokerSocketServers();
+
+/**
+ * A collection of all controllers in the cluster. For ZK-based clusters, 
this will return the broker which is also
+ * currently the active controller. For Raft-based clusters, this will 
return all controller servers.
+ */
+Collection controllerSocketServers();
+
+/**
+ * Any one of the broker servers.
+ */
+Optional anyBrokerSocketServer();
+
+/**
+ * Any one of the controller servers.
+ */
+Optional anyControllerSocketServer();

Review comment:
   Same question... can we just throw an exception if there are no 
controller socket servers?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9986: JUnit extensions for integration tests

2021-02-08 Thread GitBox


cmccabe commented on a change in pull request #9986:
URL: https://github.com/apache/kafka/pull/9986#discussion_r572290152



##
File path: core/src/test/java/kafka/test/ClusterInstance.java
##
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test;
+
+import kafka.network.SocketServer;
+import kafka.test.annotation.ClusterTest;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.common.network.ListenerName;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.Properties;
+
+public interface ClusterInstance {
+
+enum ClusterType {
+ZK,
+// Raft
+}
+
+/**
+ * Cluster type. For now, only ZK is supported.
+ */
+ClusterType clusterType();
+
+/**
+ * The cluster configuration used to create this cluster. Changing data in 
this instance through this accessor will
+ * have no affect on the cluster since it is already provisioned.
+ */
+ClusterConfig config();
+
+/**
+ * The listener for this cluster as configured by {@link ClusterTest} or 
by {@link ClusterConfig}. If
+ * unspecified by those sources, this will return the listener for the 
default security protocol PLAINTEXT
+ */
+ListenerName listener();
+
+/**
+ * The broker connect string which can be used by clients for bootstrapping
+ */
+String brokerList();
+
+/**
+ * A collection of all brokers in the cluster. In ZK-based clusters this 
will also include the broker which is
+ * acting as the controller (since ZK controllers serve both broker and 
controller roles).
+ */
+Collection brokerSocketServers();
+
+/**
+ * A collection of all controllers in the cluster. For ZK-based clusters, 
this will return the broker which is also
+ * currently the active controller. For Raft-based clusters, this will 
return all controller servers.
+ */
+Collection controllerSocketServers();
+
+/**
+ * Any one of the broker servers.
+ */
+Optional anyBrokerSocketServer();

Review comment:
   Why does this return an `Optional<>`?  Zero-node clusters seem like a 
corner case we don't really care about (we can just throw an exception 
there)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9986: JUnit extensions for integration tests

2021-02-08 Thread GitBox


cmccabe commented on a change in pull request #9986:
URL: https://github.com/apache/kafka/pull/9986#discussion_r572289080



##
File path: core/src/test/java/kafka/test/ClusterInstance.java
##
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test;
+
+import kafka.network.SocketServer;
+import kafka.test.annotation.ClusterTest;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.common.network.ListenerName;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.Properties;
+
+public interface ClusterInstance {
+
+enum ClusterType {
+ZK,
+// Raft
+}
+
+/**
+ * Cluster type. For now, only ZK is supported.
+ */
+ClusterType clusterType();
+
+/**
+ * The cluster configuration used to create this cluster. Changing data in 
this instance through this accessor will
+ * have no affect on the cluster since it is already provisioned.
+ */
+ClusterConfig config();
+
+/**
+ * The listener for this cluster as configured by {@link ClusterTest} or 
by {@link ClusterConfig}. If
+ * unspecified by those sources, this will return the listener for the 
default security protocol PLAINTEXT
+ */
+ListenerName listener();
+
+/**
+ * The broker connect string which can be used by clients for bootstrapping
+ */
+String brokerList();

Review comment:
   I realize `brokerList` is the existing name, but given that we've 
standardized on `--bootstrap-server` for the command-line argument, maybe it 
makes sense to use that name here?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9986: JUnit extensions for integration tests

2021-02-08 Thread GitBox


cmccabe commented on a change in pull request #9986:
URL: https://github.com/apache/kafka/pull/9986#discussion_r572286453



##
File path: core/src/test/java/kafka/test/ClusterInstance.java
##
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test;
+
+import kafka.network.SocketServer;
+import kafka.test.annotation.ClusterTest;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.common.network.ListenerName;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.Properties;
+
+public interface ClusterInstance {
+
+enum ClusterType {
+Zk,
+// Raft
+}
+
+/**
+ * Cluster type. For now, only ZK is supported.
+ */
+ClusterType clusterType();
+
+/**
+ * The cluster configuration used to create this cluster. Changing data in 
this instance through this accessor will
+ * have no affect on the cluster since it is already provisioned.
+ */
+ClusterConfig config();
+
+/**
+ * The listener for this cluster as configured by {@link ClusterTest} or 
by {@link ClusterConfig}. If
+ * unspecified by those sources, this will return the listener for the 
default security protocol PLAINTEXT
+ */

Review comment:
   How about naming these "clientSecurityProtocol", "clientListenerName", 
etc. to reflect the fact that the clients will be using these security 
protocols, listener names, etc.  Then we can add more listeners later for 
specific tests without doing a big rename.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12305) Flatten SMT fails on arrays

2021-02-08 Thread Randall Hauch (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17281295#comment-17281295
 ] 

Randall Hauch commented on KAFKA-12305:
---

{quote}
Do you have thoughts on whether elements inside arrays should themselves be 
flattened? 
{quote}

I think it's arguable whether Flatten should flatten structs/maps *with* 
arrays. My initial thought is that it should not because the documentation does 
not mention arrays, and because it's not clear to me how often that case will 
come up. 

I'm definitely open to suggestions from others, though!


> Flatten SMT fails on arrays
> ---
>
> Key: KAFKA-12305
> URL: https://issues.apache.org/jira/browse/KAFKA-12305
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.0.1, 2.1.1, 2.2.2, 2.3.1, 2.4.1, 2.5.1, 2.7.0, 2.6.1, 
> 2.8.0
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>
> The {{Flatten}} SMT fails for array types. A sophisticated approach that 
> tries to flatten arrays might be desirable in some cases, and may have been 
> punted during the early design phase of the transform, but in the interim, 
> it's probably not worth it to make array data and the SMT mutually exclusive.
> A naive approach that preserves arrays as-are and doesn't attempt to flatten 
> them seems fair for now, but one alternative could be to traverse array 
> elements and, if any are maps or structs, flatten those as well.
> Adding behavior to fully flatten arrays by essentially transforming them into 
> maps whose elements are the elements of the array and whose keys are the 
> indices of each element is likely out of scope for a bug fix and, although 
> useful, might have to wait for a KIP.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe commented on a change in pull request #9986: JUnit extensions for integration tests

2021-02-08 Thread GitBox


cmccabe commented on a change in pull request #9986:
URL: https://github.com/apache/kafka/pull/9986#discussion_r572284766



##
File path: core/src/test/java/kafka/test/ClusterConfig.java
##
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test;
+
+import kafka.test.annotation.Type;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+
+import java.io.File;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Represents a requested configuration of a Kafka cluster for integration 
testing
+ */
+public class ClusterConfig {
+
+private final Type type;
+private final int brokers;
+private final int controllers;
+private final String name;
+private final boolean autoStart;
+
+private final String securityProtocol;
+private final String listenerName;
+private final File trustStoreFile;
+
+private final Properties serverProperties = new Properties();
+private final Properties producerProperties = new Properties();
+private final Properties consumerProperties = new Properties();
+private final Properties adminClientProperties = new Properties();
+private final Properties saslServerProperties = new Properties();
+private final Properties saslClientProperties = new Properties();
+
+ClusterConfig(Type type, int brokers, int controllers, String name, 
boolean autoStart,
+  String securityProtocol, String listenerName, File 
trustStoreFile) {
+this.type = type;
+this.brokers = brokers;
+this.controllers = controllers;
+this.name = name;
+this.autoStart = autoStart;
+this.securityProtocol = securityProtocol;
+this.listenerName = listenerName;
+this.trustStoreFile = trustStoreFile;
+}
+
+public Type clusterType() {
+return type;
+}
+
+public int brokers() {
+return brokers;
+}
+
+public int controllers() {
+return controllers;

Review comment:
   What does this return in ZK mode?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9986: JUnit extensions for integration tests

2021-02-08 Thread GitBox


cmccabe commented on a change in pull request #9986:
URL: https://github.com/apache/kafka/pull/9986#discussion_r572284631



##
File path: core/src/test/java/kafka/test/ClusterConfig.java
##
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test;
+
+import kafka.test.annotation.Type;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+
+import java.io.File;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Represents a requested configuration of a Kafka cluster for integration 
testing
+ */
+public class ClusterConfig {
+
+private final Type type;
+private final int brokers;
+private final int controllers;
+private final String name;
+private final boolean autoStart;
+
+private final String securityProtocol;
+private final String listenerName;
+private final File trustStoreFile;
+
+private final Properties serverProperties = new Properties();
+private final Properties producerProperties = new Properties();
+private final Properties consumerProperties = new Properties();
+private final Properties adminClientProperties = new Properties();
+private final Properties saslServerProperties = new Properties();
+private final Properties saslClientProperties = new Properties();
+
+ClusterConfig(Type type, int brokers, int controllers, String name, 
boolean autoStart,
+  String securityProtocol, String listenerName, File 
trustStoreFile) {
+this.type = type;
+this.brokers = brokers;
+this.controllers = controllers;
+this.name = name;
+this.autoStart = autoStart;
+this.securityProtocol = securityProtocol;
+this.listenerName = listenerName;
+this.trustStoreFile = trustStoreFile;
+}
+
+public Type clusterType() {
+return type;
+}
+
+public int brokers() {

Review comment:
   let's use "numBrokers" , etc. to reflect the fact that we're not 
returning the actual brokers.  Same for "controllers" 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9986: JUnit extensions for integration tests

2021-02-08 Thread GitBox


cmccabe commented on a change in pull request #9986:
URL: https://github.com/apache/kafka/pull/9986#discussion_r572284053



##
File path: core/src/test/java/kafka/test/ClusterConfig.java
##
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test;
+
+import kafka.test.annotation.Type;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+
+import java.io.File;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Represents a requested configuration of a Kafka cluster for integration 
testing
+ */
+public class ClusterConfig {
+
+private final Type type;
+private final int brokers;
+private final int controllers;
+private final String name;
+private final boolean autoStart;
+
+private final String securityProtocol;

Review comment:
   If there's no reason to use a string then let's just use the 
`SecurityProtocol` enum.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >