[GitHub] [kafka] showuon commented on pull request #9690: KAFKA-10017: fix flaky EOS-beta upgrade test

2020-12-16 Thread GitBox


showuon commented on pull request #9690:
URL: https://github.com/apache/kafka/pull/9690#issuecomment-747243284


   @mjsax , I further investigated the issue I found last week:
   > It's because sometimes, the keys in stream store is empty, and that's why 
the following computation based on the variable is wrong. Here's the logs I 
got: (They mapped to the code here, which is in phase 6)
   ```
   keysFirstClientBeta is []
   keysSecondClientAlpha is [1, 3]
   ```
   I finally found out the root cause, it's because **the stream is not 
completed the stable assignment rebalancing** during 
`keysFromInstance(streams)`. Echo the discussion in my another PR: 
https://github.com/apache/kafka/pull/9733#discussion_r543859937, we did have 
"cut off" the unstable rebalances via 
`assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);`, but with 
this, we can only make sure the assignment is completed and stable now, but the 
streams haven't completed the REBALANCING yet. The workflow is like this:
   
   Coordinator finished stable assignment of tasks -> notify tasks -> task 
handles the new assignment -> **stream thread** change state from RUNNING to 
PARTITIONS_ASSIGNED -> **stream client** change state from RUNNING to 
REBALANCING -> **stream thread** change state from PARTITIONS_ASSIGNED to 
RUNNING -> **stream client** change state from REBALANCING to RUNNING
   
   And what we can make sure via 
`assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);` is only the 
step 1: `Coordinator finished stable assignment of tasks` completes. Also 
remember, the current stream state is `RUNNING`, which will pass the following 
checking (`waitForRunning()`). And the empty list is because the 1st unstable 
assignment to the stream is empty:
   
   ```
   2020-12-17T14:07:32.331+0800 [DEBUG] [TestEventLogger] [2020-12-17 
14:07:32,331] INFO [Consumer clientId=appDir1-StreamThread-1-consumer, 
groupId=appId-1] Updating assignment with
   2020-12-17T14:07:32.331+0800 [DEBUG] [TestEventLogger]  Assigned 
partitions:   []
   2020-12-17T14:07:32.331+0800 [DEBUG] [TestEventLogger]  Current 
owned partitions:  []
   2020-12-17T14:07:32.331+0800 [DEBUG] [TestEventLogger]  Added 
partitions (assigned - owned):   []
   2020-12-17T14:07:32.331+0800 [DEBUG] [TestEventLogger]  Revoked 
partitions (owned - assigned): []
   2020-12-17T14:07:32.331+0800 [DEBUG] [TestEventLogger]  
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:393)
   ```
   
   So, that's why we got the empty key list form the stream store.
   
   As I mentioned in #9733 , after 
`assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);`, I think we 
should explicitly wait for specific transition pair, ex: [KeyValue(RUNNING, 
REBALANCING), KeyValue(REBALANCING, RUNNING)], instead of waiting for RUNNING 
state only. Also, as you said, there might be more than 2 rebalancing happened 
after 1 stream started, I think we can have a count for `onAssignmentComplete` 
after `prepareForRebalance` (unstable + stable count), so that after it's 
stable assigned, we can know exactly how many rebalancing happened so that we 
can check the state transition content, ex: with 2 assignment happened, we can 
check if state transition list has 2 rebalancing value, and the last one is 
RUNNING state...etc.
   
   Anyway, that's my finding, share with you. I'll update in my PR #9733 (maybe 
next week since a little busy these days). Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9754: KAFKA-10856: Convert sticky assignor userData schemas to use generated protocol

2020-12-16 Thread GitBox


chia7712 commented on a change in pull request #9754:
URL: https://github.com/apache/kafka/pull/9754#discussion_r544844791



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
##
@@ -222,51 +204,50 @@ protected MemberData memberData(Subscription 
subscription) {
 return deserializeTopicPartitionAssignment(userData);
 }
 
-// visible for testing
 static ByteBuffer serializeTopicPartitionAssignment(MemberData memberData) 
{
-Struct struct = new Struct(STICKY_ASSIGNOR_USER_DATA_V1);
-List topicAssignments = new ArrayList<>();
+return serializeTopicPartitionAssignment(memberData, 
StickyAssignorUserData.HIGHEST_SUPPORTED_VERSION);
+}
+
+// visible for testing
+static ByteBuffer serializeTopicPartitionAssignment(MemberData memberData, 
short version) {
+
+List topicAssignments = new 
ArrayList<>();
 for (Map.Entry> topicEntry : 
CollectionUtils.groupPartitionsByTopic(memberData.partitions).entrySet()) {
-Struct topicAssignment = new Struct(TOPIC_ASSIGNMENT);
-topicAssignment.set(TOPIC_KEY_NAME, topicEntry.getKey());
-topicAssignment.set(PARTITIONS_KEY_NAME, 
topicEntry.getValue().toArray());
-topicAssignments.add(topicAssignment);
+StickyAssignorUserData.TopicPartition topicPartition = new 
StickyAssignorUserData.TopicPartition()
+.setTopic(topicEntry.getKey())
+.setPartitions(topicEntry.getValue());
+topicAssignments.add(topicPartition);
+}
+StickyAssignorUserData data = new StickyAssignorUserData()
+.setPreviousAssignment(topicAssignments);
+if (version >= 1) {
+memberData.generation.ifPresent(data::setGeneration);
 }
-struct.set(TOPIC_PARTITIONS_KEY_NAME, topicAssignments.toArray());
-if (memberData.generation.isPresent())
-struct.set(GENERATION_KEY_NAME, memberData.generation.get());
-ByteBuffer buffer = 
ByteBuffer.allocate(STICKY_ASSIGNOR_USER_DATA_V1.sizeOf(struct));
-STICKY_ASSIGNOR_USER_DATA_V1.write(buffer, struct);
-buffer.flip();
-return buffer;
+return MessageUtil.toVersionPrefixedByteBuffer(version, data);
 }
 
-private static MemberData deserializeTopicPartitionAssignment(ByteBuffer 
buffer) {
-Struct struct;
-ByteBuffer copy = buffer.duplicate();
+private static MemberData deserializeTopicPartitionAssignment(ByteBuffer 
buffer, short version) {
+StickyAssignorUserData data;
 try {
-struct = STICKY_ASSIGNOR_USER_DATA_V1.read(buffer);
+data = new StickyAssignorUserData(new ByteBufferAccessor(buffer), 
version);

Review comment:
   > I think we needn't keep this backtrack compatibility since this code 
is only executed at client.
   
   It seems to me that is not allowed since the different (consumer) clients 
with different (kafka) versions should be able to work together. Please take a 
look at 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-341%3A+Update+Sticky+Assignor%27s+User+Data+Protocol
 to see how Kafka consider backward/Forward compatibility.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dongxuwang opened a new pull request #9763: MINOR: Use ApiUtils' methods static imported consistently.

2020-12-16 Thread GitBox


dongxuwang opened a new pull request #9763:
URL: https://github.com/apache/kafka/pull/9763


   Because there have been using the static import methods of ApiUtils, it is 
not necessary to import the whole ApiUtils which also keeps the consistency.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on a change in pull request #9754: KAFKA-10856: Convert sticky assignor userData schemas to use generated protocol

2020-12-16 Thread GitBox


dengziming commented on a change in pull request #9754:
URL: https://github.com/apache/kafka/pull/9754#discussion_r544811722



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
##
@@ -222,51 +204,50 @@ protected MemberData memberData(Subscription 
subscription) {
 return deserializeTopicPartitionAssignment(userData);
 }
 
-// visible for testing
 static ByteBuffer serializeTopicPartitionAssignment(MemberData memberData) 
{
-Struct struct = new Struct(STICKY_ASSIGNOR_USER_DATA_V1);
-List topicAssignments = new ArrayList<>();
+return serializeTopicPartitionAssignment(memberData, 
StickyAssignorUserData.HIGHEST_SUPPORTED_VERSION);
+}
+
+// visible for testing
+static ByteBuffer serializeTopicPartitionAssignment(MemberData memberData, 
short version) {
+
+List topicAssignments = new 
ArrayList<>();
 for (Map.Entry> topicEntry : 
CollectionUtils.groupPartitionsByTopic(memberData.partitions).entrySet()) {

Review comment:
   Because topicAssignments put all partitions of a topic in a field. I 
think the name `TopicPartition` is misleading and I changed it to 
`TopicPartitions`, and also the `TopicPartition` in 
`ConsumerProtocolAssignment.json` and `ConsumerProtocolSubscription.json` is 
also misleading, we could also create a new pr the alter them.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on a change in pull request #9754: KAFKA-10856: Convert sticky assignor userData schemas to use generated protocol

2020-12-16 Thread GitBox


dengziming commented on a change in pull request #9754:
URL: https://github.com/apache/kafka/pull/9754#discussion_r544818885



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
##
@@ -222,51 +204,50 @@ protected MemberData memberData(Subscription 
subscription) {
 return deserializeTopicPartitionAssignment(userData);
 }
 
-// visible for testing
 static ByteBuffer serializeTopicPartitionAssignment(MemberData memberData) 
{
-Struct struct = new Struct(STICKY_ASSIGNOR_USER_DATA_V1);
-List topicAssignments = new ArrayList<>();
+return serializeTopicPartitionAssignment(memberData, 
StickyAssignorUserData.HIGHEST_SUPPORTED_VERSION);
+}
+
+// visible for testing
+static ByteBuffer serializeTopicPartitionAssignment(MemberData memberData, 
short version) {
+
+List topicAssignments = new 
ArrayList<>();
 for (Map.Entry> topicEntry : 
CollectionUtils.groupPartitionsByTopic(memberData.partitions).entrySet()) {
-Struct topicAssignment = new Struct(TOPIC_ASSIGNMENT);
-topicAssignment.set(TOPIC_KEY_NAME, topicEntry.getKey());
-topicAssignment.set(PARTITIONS_KEY_NAME, 
topicEntry.getValue().toArray());
-topicAssignments.add(topicAssignment);
+StickyAssignorUserData.TopicPartition topicPartition = new 
StickyAssignorUserData.TopicPartition()
+.setTopic(topicEntry.getKey())
+.setPartitions(topicEntry.getValue());
+topicAssignments.add(topicPartition);
+}
+StickyAssignorUserData data = new StickyAssignorUserData()
+.setPreviousAssignment(topicAssignments);
+if (version >= 1) {
+memberData.generation.ifPresent(data::setGeneration);
 }
-struct.set(TOPIC_PARTITIONS_KEY_NAME, topicAssignments.toArray());
-if (memberData.generation.isPresent())
-struct.set(GENERATION_KEY_NAME, memberData.generation.get());
-ByteBuffer buffer = 
ByteBuffer.allocate(STICKY_ASSIGNOR_USER_DATA_V1.sizeOf(struct));
-STICKY_ASSIGNOR_USER_DATA_V1.write(buffer, struct);
-buffer.flip();
-return buffer;
+return MessageUtil.toVersionPrefixedByteBuffer(version, data);
 }
 
-private static MemberData deserializeTopicPartitionAssignment(ByteBuffer 
buffer) {
-Struct struct;
-ByteBuffer copy = buffer.duplicate();
+private static MemberData deserializeTopicPartitionAssignment(ByteBuffer 
buffer, short version) {
+StickyAssignorUserData data;
 try {
-struct = STICKY_ASSIGNOR_USER_DATA_V1.read(buffer);
+data = new StickyAssignorUserData(new ByteBufferAccessor(buffer), 
version);

Review comment:
   Yes, the previous ser and de-ser carry no version so the code isn't 
graceful, I think we needn't keep this backtrack compatibility since this code 
is only executed at client.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on a change in pull request #9754: KAFKA-10856: Convert sticky assignor userData schemas to use generated protocol

2020-12-16 Thread GitBox


dengziming commented on a change in pull request #9754:
URL: https://github.com/apache/kafka/pull/9754#discussion_r544818885



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
##
@@ -222,51 +204,50 @@ protected MemberData memberData(Subscription 
subscription) {
 return deserializeTopicPartitionAssignment(userData);
 }
 
-// visible for testing
 static ByteBuffer serializeTopicPartitionAssignment(MemberData memberData) 
{
-Struct struct = new Struct(STICKY_ASSIGNOR_USER_DATA_V1);
-List topicAssignments = new ArrayList<>();
+return serializeTopicPartitionAssignment(memberData, 
StickyAssignorUserData.HIGHEST_SUPPORTED_VERSION);
+}
+
+// visible for testing
+static ByteBuffer serializeTopicPartitionAssignment(MemberData memberData, 
short version) {
+
+List topicAssignments = new 
ArrayList<>();
 for (Map.Entry> topicEntry : 
CollectionUtils.groupPartitionsByTopic(memberData.partitions).entrySet()) {
-Struct topicAssignment = new Struct(TOPIC_ASSIGNMENT);
-topicAssignment.set(TOPIC_KEY_NAME, topicEntry.getKey());
-topicAssignment.set(PARTITIONS_KEY_NAME, 
topicEntry.getValue().toArray());
-topicAssignments.add(topicAssignment);
+StickyAssignorUserData.TopicPartition topicPartition = new 
StickyAssignorUserData.TopicPartition()
+.setTopic(topicEntry.getKey())
+.setPartitions(topicEntry.getValue());
+topicAssignments.add(topicPartition);
+}
+StickyAssignorUserData data = new StickyAssignorUserData()
+.setPreviousAssignment(topicAssignments);
+if (version >= 1) {
+memberData.generation.ifPresent(data::setGeneration);
 }
-struct.set(TOPIC_PARTITIONS_KEY_NAME, topicAssignments.toArray());
-if (memberData.generation.isPresent())
-struct.set(GENERATION_KEY_NAME, memberData.generation.get());
-ByteBuffer buffer = 
ByteBuffer.allocate(STICKY_ASSIGNOR_USER_DATA_V1.sizeOf(struct));
-STICKY_ASSIGNOR_USER_DATA_V1.write(buffer, struct);
-buffer.flip();
-return buffer;
+return MessageUtil.toVersionPrefixedByteBuffer(version, data);
 }
 
-private static MemberData deserializeTopicPartitionAssignment(ByteBuffer 
buffer) {
-Struct struct;
-ByteBuffer copy = buffer.duplicate();
+private static MemberData deserializeTopicPartitionAssignment(ByteBuffer 
buffer, short version) {
+StickyAssignorUserData data;
 try {
-struct = STICKY_ASSIGNOR_USER_DATA_V1.read(buffer);
+data = new StickyAssignorUserData(new ByteBufferAccessor(buffer), 
version);

Review comment:
   Yes, the previous ser and de-ser carry no version so the code isn't 
graceful, I think we needn't keep this backtrack compatability since this code 
is only executed at client.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on a change in pull request #9754: KAFKA-10856: Convert sticky assignor userData schemas to use generated protocol

2020-12-16 Thread GitBox


dengziming commented on a change in pull request #9754:
URL: https://github.com/apache/kafka/pull/9754#discussion_r544811722



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
##
@@ -222,51 +204,50 @@ protected MemberData memberData(Subscription 
subscription) {
 return deserializeTopicPartitionAssignment(userData);
 }
 
-// visible for testing
 static ByteBuffer serializeTopicPartitionAssignment(MemberData memberData) 
{
-Struct struct = new Struct(STICKY_ASSIGNOR_USER_DATA_V1);
-List topicAssignments = new ArrayList<>();
+return serializeTopicPartitionAssignment(memberData, 
StickyAssignorUserData.HIGHEST_SUPPORTED_VERSION);
+}
+
+// visible for testing
+static ByteBuffer serializeTopicPartitionAssignment(MemberData memberData, 
short version) {
+
+List topicAssignments = new 
ArrayList<>();
 for (Map.Entry> topicEntry : 
CollectionUtils.groupPartitionsByTopic(memberData.partitions).entrySet()) {

Review comment:
   Because topicAssignments put all partitions of a topic in a field. I 
think the name `TopicPartition` is misleading and I changed it to 
`TopicPartitions`, and also the `TopicPartition` in 
ConsumerProtocolAssignment.json and ConsumerProtocolSubscription.json is also 
misleading, we could also create a new pr the alter them.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] g1geordie commented on pull request #9707: KAFKA-10790 Detect/Prevent Deadlock on Producer Network Thread

2020-12-16 Thread GitBox


g1geordie commented on pull request #9707:
URL: https://github.com/apache/kafka/pull/9707#issuecomment-747188758


   @ijuma hello 
   
   The close method want to prevent self-join but there are no deadlock  . 
   
   The flush method  in callback has a deadlock.
   The deadlock is because  `flush` wait the `sender` send all message .
   but messages are done after callback complete .
   **flush in callback  , callback will never complete .**
   
   `flush`'s semanteme is send message . 
   so wait all message send I think it's necessary .
   
   Messages are done  after callback I think is option ,
   Or we can make async to `callback`  ,then messages will done no matter 
`callback` is complete. 
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9760: KAFKA-10850: Use 'Int.box' to replace deprecated 'new Integer' from BrokerToControllerRequestThreadTest

2020-12-16 Thread GitBox


chia7712 commented on pull request #9760:
URL: https://github.com/apache/kafka/pull/9760#issuecomment-747184218


   @govi20 It seems we don't need to box the primitive type as scala compiler 
complete the conversion for us, right?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on a change in pull request #9622: KAFKA-10547; add topicId in MetadataResp

2020-12-16 Thread GitBox


dengziming commented on a change in pull request #9622:
URL: https://github.com/apache/kafka/pull/9622#discussion_r544782830



##
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##
@@ -314,9 +315,16 @@ class MetadataCache(brokerId: Int) extends Logging {
   error(s"Listeners are not identical across brokers: $aliveNodes")
   }
 
+  val newTopicIds = updateMetadataRequest.topicStates().asScala
+.map(topicState => (topicState.topicName(), topicState.topicId()))
+.filter(_._2 != Uuid.ZERO_UUID).toMap
+  val topicIds = mutable.Map.empty[String, Uuid]
+  topicIds.addAll(metadataSnapshot.topicIds)
+  topicIds.addAll(newTopicIds)

Review comment:
   Thank you for your suggestions, I now understand the solution. I remove 
the topicId in `removePartitionInfo`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on a change in pull request #9622: KAFKA-10547; add topicId in MetadataResp

2020-12-16 Thread GitBox


dengziming commented on a change in pull request #9622:
URL: https://github.com/apache/kafka/pull/9622#discussion_r544782830



##
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##
@@ -314,9 +315,16 @@ class MetadataCache(brokerId: Int) extends Logging {
   error(s"Listeners are not identical across brokers: $aliveNodes")
   }
 
+  val newTopicIds = updateMetadataRequest.topicStates().asScala
+.map(topicState => (topicState.topicName(), topicState.topicId()))
+.filter(_._2 != Uuid.ZERO_UUID).toMap
+  val topicIds = mutable.Map.empty[String, Uuid]
+  topicIds.addAll(metadataSnapshot.topicIds)
+  topicIds.addAll(newTopicIds)

Review comment:
   Thank you for your suggestions, I now understand the solution.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9761: KAFKA-10768 Add a test for ByteBufferInputStream to ByteBufferLogInputStreamTest

2020-12-16 Thread GitBox


chia7712 commented on a change in pull request #9761:
URL: https://github.com/apache/kafka/pull/9761#discussion_r544768525



##
File path: 
clients/src/test/java/org/apache/kafka/common/record/ByteBufferLogInputStreamTest.java
##
@@ -120,4 +121,26 @@ public void iteratorRaisesOnTooLargeRecords() {
 logInputStream.nextBatch();
 }
 
+@Test
+public void testReadUnsignedIntFromInputStream() {

Review comment:
   Could you move the tests to ```ByteBufferInputStreamTest```? If there is 
no such test class, feel free to create new one.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9761: KAFKA-10768 Add a test for ByteBufferInputStream to ByteBufferLogInputStreamTest

2020-12-16 Thread GitBox


chia7712 commented on pull request #9761:
URL: https://github.com/apache/kafka/pull/9761#issuecomment-747165929


   @bertber Please take a look at origin PR 
(https://github.com/apache/kafka/pull/3752/files#diff-ae627decd5dd27d053a7a9b051860f60df6b89c8884e50df2f9ae0de6a4645e5R40).
 It would be better to fix the scenario of 'len==0'.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9750: MINOR: Change toArray usage for Increase efficiency

2020-12-16 Thread GitBox


chia7712 commented on pull request #9750:
URL: https://github.com/apache/kafka/pull/9750#issuecomment-747165373


   @APaMio Could you rebase code to include recently big commits?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10861) Flaky test `TransactionsTest.testFencingOnSendOffsets`

2020-12-16 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-10861.
-
Resolution: Fixed

> Flaky test `TransactionsTest.testFencingOnSendOffsets`
> --
>
> Key: KAFKA-10861
> URL: https://issues.apache.org/jira/browse/KAFKA-10861
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> {code}
> org.scalatest.exceptions.TestFailedException: Got an unexpected exception 
> from a fenced producer.
>   at 
> org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530)
>   at 
> org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529)
>   at 
> org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389)
>   at org.scalatest.Assertions.fail(Assertions.scala:1107)
>   at org.scalatest.Assertions.fail$(Assertions.scala:1103)
>   at org.scalatest.Assertions$.fail(Assertions.scala:1389)
>   at 
> kafka.api.TransactionsTest.testFencingOnSendOffsets(TransactionsTest.scala:373)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at Caused by: 
> org.apache.kafka.common.errors.InvalidProducerEpochException: Producer 
> attempted to produce with an old epoch (producerId=0, epoch=0)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji merged pull request #9762: KAFKA-10861; Fix race condition in flaky test `testFencingOnSendOffsets`

2020-12-16 Thread GitBox


hachikuji merged pull request #9762:
URL: https://github.com/apache/kafka/pull/9762


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10862) kafak stream consume from the earliest by default

2020-12-16 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10862?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17250720#comment-17250720
 ] 

A. Sophie Blee-Goldman commented on KAFKA-10862:


Hey [~rebekkaxi], you're looking at the default for two different things. The 
docs you linked to are referring to the consumer client configs, which do 
default to "latest". You can find the config definition in 
[ConsumerConfig.|https://github.com/apache/kafka/blob/72918a98161ba71ff4fa8116fdf8ed02b09a0580/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java#L434]

The code you're looking at corresponds to the configs for Kafka Streams. 
Internally Streams embeds a consumer client, and overrides the default 
auto.offset.reset policy to "earliest". You can find the docs for 
Streams-specific configs and how they override the client defaults 
[here|https://kafka.apache.org/25/documentation/streams/developer-guide/config-streams.html#default-values].

So the default for Kafka Streams is indeed different than the default reset 
policy for a plain consumer app. The reason being that Streams generally wants 
to start from the beginning in order to process all available data. Why the 
default for the plain consumer is "latest" is another question :) 

> kafak stream consume from the earliest by default
> -
>
> Key: KAFKA-10862
> URL: https://issues.apache.org/jira/browse/KAFKA-10862
> Project: Kafka
>  Issue Type: Bug
>  Components: config, consumer
>Affects Versions: 2.3.1
> Environment: MAC
>Reporter: Yuexi Liu
>Priority: Major
>
> on [https://kafka.apache.org/documentation/#auto.offset.reset] it shows 
> auto.offset.reset is by default using latest, but from code, it is not
>  
> [https://github.com/apache/kafka/blob/72918a98161ba71ff4fa8116fdf8ed02b09a0580/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java#L884]
>  and when I create a kafka stream without specified offset reset policy, it 
> consumed from the beginning



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10862) kafak stream consume from the earliest by default

2020-12-16 Thread Yuexi Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuexi Liu updated KAFKA-10862:
--
Description: 
on [https://kafka.apache.org/documentation/#auto.offset.reset] it shows 
auto.offset.reset is by default using latest, but from code, it is not
 
[https://github.com/apache/kafka/blob/72918a98161ba71ff4fa8116fdf8ed02b09a0580/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java#L884]
 and when I create a kafka stream without specified offset reset policy, it 
consumed from the beginning

  was:
on [https://kafka.apache.org/documentation/#auto.offset.reset] it shows 
auto.offset.reset is by default using latest, but from code, it is not
[https://github.com/apache/kafka/blob/72918a98161ba71ff4fa8116fdf8ed02b09a0580/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java#L884]
and when I create a kafka stream without specified offset, it consumed from the 
beginning


> kafak stream consume from the earliest by default
> -
>
> Key: KAFKA-10862
> URL: https://issues.apache.org/jira/browse/KAFKA-10862
> Project: Kafka
>  Issue Type: Bug
>  Components: config, consumer
>Affects Versions: 2.3.1
> Environment: MAC
>Reporter: Yuexi Liu
>Priority: Major
>
> on [https://kafka.apache.org/documentation/#auto.offset.reset] it shows 
> auto.offset.reset is by default using latest, but from code, it is not
>  
> [https://github.com/apache/kafka/blob/72918a98161ba71ff4fa8116fdf8ed02b09a0580/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java#L884]
>  and when I create a kafka stream without specified offset reset policy, it 
> consumed from the beginning



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10862) kafak stream consume from the earliest by default

2020-12-16 Thread Yuexi Liu (Jira)
Yuexi Liu created KAFKA-10862:
-

 Summary: kafak stream consume from the earliest by default
 Key: KAFKA-10862
 URL: https://issues.apache.org/jira/browse/KAFKA-10862
 Project: Kafka
  Issue Type: Bug
  Components: config, consumer
Affects Versions: 2.3.1
 Environment: MAC
Reporter: Yuexi Liu


on [https://kafka.apache.org/documentation/#auto.offset.reset] it shows 
auto.offset.reset is by default using latest, but from code, it is not
[https://github.com/apache/kafka/blob/72918a98161ba71ff4fa8116fdf8ed02b09a0580/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java#L884]
and when I create a kafka stream without specified offset, it consumed from the 
beginning



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on pull request #9762: KAFKA-10861; Fix race condition in flaky test `testFencingOnSendOffsets`

2020-12-16 Thread GitBox


guozhangwang commented on pull request #9762:
URL: https://github.com/apache/kafka/pull/9762#issuecomment-747137717


   LGTM



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-16 Thread GitBox


d8tltanc commented on pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#issuecomment-747116007


   Thanks @rajinisivaram for the NIT and test structure suggestions. I've 
adopted those and re-struct the test classes. Please let me know if we are good 
to merge now.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-16 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r544713670



##
File path: 
core/src/test/scala/unit/kafka/security/authorizer/AuthorizerWrapperTest.scala
##
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.security.authorizer
+
+import java.net.InetAddress
+import java.util.UUID
+
+import kafka.security.auth.SimpleAclAuthorizer
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import kafka.zk.ZooKeeperTestHarness
+import kafka.zookeeper.ZooKeeperClient
+import org.apache.kafka.common.acl.AclOperation._
+import org.apache.kafka.common.acl._
+import org.apache.kafka.common.network.{ClientInformation, ListenerName}
+import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.requests.{RequestContext, RequestHeader}
+import org.apache.kafka.common.resource.PatternType.LITERAL
+import org.apache.kafka.common.resource.ResourceType._
+import org.apache.kafka.common.resource.{ResourcePattern, ResourceType}
+import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.authorizer._
+import org.junit.Assert._
+import org.junit.{After, Before, Test}
+
+import scala.annotation.nowarn
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
+
+class AuthorizerWrapperTest extends ZooKeeperTestHarness {
+  @nowarn("cat=deprecation")
+  private val wrappedSimpleAuthorizer = new AuthorizerWrapper(new 
SimpleAclAuthorizer)
+  @nowarn("cat=deprecation")
+  private val wrappedSimpleAuthorizerAllowEveryone = new AuthorizerWrapper(new 
SimpleAclAuthorizer)
+  private var resource: ResourcePattern = _
+  private val superUsers = "User:superuser1; User:superuser2"
+  private val username = "alice"
+  private val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
username)
+  private val requestContext = newRequestContext(principal, 
InetAddress.getByName("192.168.0.1"))
+  private var config: KafkaConfig = _
+  private var zooKeeperClient: ZooKeeperClient = _
+
+  private val aclAdded: ArrayBuffer[(Authorizer, Set[AccessControlEntry], 
ResourcePattern)] = ArrayBuffer()
+  private val authorizerTestFactory = new AuthorizerTestFactory(
+newRequestContext, addAcls, authorizeByResourceType, removeAcls)
+
+  class CustomPrincipal(principalType: String, name: String) extends 
KafkaPrincipal(principalType, name) {
+override def equals(o: scala.Any): Boolean = false
+  }
+
+  @Before
+  @nowarn("cat=deprecation")
+  override def setUp(): Unit = {
+super.setUp()
+
+val props = TestUtils.createBrokerConfig(0, zkConnect)
+
+props.put(AclAuthorizer.SuperUsersProp, superUsers)
+config = KafkaConfig.fromProps(props)
+wrappedSimpleAuthorizer.configure(config.originals)
+
+props.put(SimpleAclAuthorizer.AllowEveryoneIfNoAclIsFoundProp, "true")
+config = KafkaConfig.fromProps(props)
+wrappedSimpleAuthorizerAllowEveryone.configure(config.originals)
+
+resource = new ResourcePattern(TOPIC, "foo-" + UUID.randomUUID(), LITERAL)
+zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, zkMaxInFlightRequests,
+  Time.SYSTEM, "kafka.test", "AuthorizerWrapperTest")
+  }
+
+  @After
+  override def tearDown(): Unit = {
+val authorizers = Seq(wrappedSimpleAuthorizer, 
wrappedSimpleAuthorizerAllowEveryone)
+authorizers.foreach(a => {
+  a.close()
+})
+zooKeeperClient.close()
+super.tearDown()
+  }
+
+  @Test
+  def testAuthorizeByResourceTypeMultipleAddAndRemove(): Unit = {
+
authorizerTestFactory.testAuthorizeByResourceTypeMultipleAddAndRemove(wrappedSimpleAuthorizer)
+  }

Review comment:
   commit 092fec70a9547ec07cba999e77be1c0cf79fa275
   commit e5e3d18f57ab22df20133f9841905af384d9b641
   
   These two commits are condensing the class methods and members into the 
BaseAuthorizerTest. 
   
   In BaseAuthorizerTest, the only abstract method is an authorizer provider. 
After overriding the provider, those test cases in it are sufficient to run.
   
   Now the test code looks much cleaner. If the changes look too 

[GitHub] [kafka] hachikuji opened a new pull request #9762: KAFKA-10861; Fix race condition in flaky test `testFencingOnSendOffsets`

2020-12-16 Thread GitBox


hachikuji opened a new pull request #9762:
URL: https://github.com/apache/kafka/pull/9762


   I wasn't able to reproduce the failure locally, but it looks like there is a 
race condition with the sending of the records in the first producer. The test 
case assumes that these records have been completed before the call to 
`sendOffsetsToTransaction`, but they very well might not be. It is even 
possible for the writes from the second producer to arrive first which would 
then result in the test failure that we are seeing. The solution is to force 
the send with `flush()`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10861) Flaky test `TransactionsTest.testFencingOnSendOffsets`

2020-12-16 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-10861:
---

 Summary: Flaky test `TransactionsTest.testFencingOnSendOffsets`
 Key: KAFKA-10861
 URL: https://issues.apache.org/jira/browse/KAFKA-10861
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson


{code}
org.scalatest.exceptions.TestFailedException: Got an unexpected exception from 
a fenced producer.
at 
org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530)
at 
org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529)
at 
org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389)
at org.scalatest.Assertions.fail(Assertions.scala:1107)
at org.scalatest.Assertions.fail$(Assertions.scala:1103)
at org.scalatest.Assertions$.fail(Assertions.scala:1389)
at 
kafka.api.TransactionsTest.testFencingOnSendOffsets(TransactionsTest.scala:373)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at Caused by: 
org.apache.kafka.common.errors.InvalidProducerEpochException: Producer 
attempted to produce with an old epoch (producerId=0, epoch=0)
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-10861) Flaky test `TransactionsTest.testFencingOnSendOffsets`

2020-12-16 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson reassigned KAFKA-10861:
---

Assignee: Jason Gustafson

> Flaky test `TransactionsTest.testFencingOnSendOffsets`
> --
>
> Key: KAFKA-10861
> URL: https://issues.apache.org/jira/browse/KAFKA-10861
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> {code}
> org.scalatest.exceptions.TestFailedException: Got an unexpected exception 
> from a fenced producer.
>   at 
> org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530)
>   at 
> org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529)
>   at 
> org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389)
>   at org.scalatest.Assertions.fail(Assertions.scala:1107)
>   at org.scalatest.Assertions.fail$(Assertions.scala:1103)
>   at org.scalatest.Assertions$.fail(Assertions.scala:1389)
>   at 
> kafka.api.TransactionsTest.testFencingOnSendOffsets(TransactionsTest.scala:373)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at Caused by: 
> org.apache.kafka.common.errors.InvalidProducerEpochException: Producer 
> attempted to produce with an old epoch (producerId=0, epoch=0)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on pull request #8162: KAFKA-9595: switch usage of `alterConfigs` to `incrementalAlterConfigs` for kafka-configs tool

2020-12-16 Thread GitBox


hachikuji commented on pull request #8162:
URL: https://github.com/apache/kafka/pull/8162#issuecomment-747090036


   @agam Going to close this for now. We can reopen once we are ready to pick 
it back up.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji closed pull request #8162: KAFKA-9595: switch usage of `alterConfigs` to `incrementalAlterConfigs` for kafka-configs tool

2020-12-16 Thread GitBox


hachikuji closed pull request #8162:
URL: https://github.com/apache/kafka/pull/8162


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Reopened] (KAFKA-10140) Incremental config api excludes plugin config changes

2020-12-16 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck reopened KAFKA-10140:
-

I resolved this by mistake, reopening now

> Incremental config api excludes plugin config changes
> -
>
> Key: KAFKA-10140
> URL: https://issues.apache.org/jira/browse/KAFKA-10140
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Critical
> Fix For: 2.7.0
>
>
> I was trying to alter the jmx metric filters using the incremental alter 
> config api and hit this error:
> ```
> java.util.NoSuchElementException: key not found: metrics.jmx.blacklist
>   at scala.collection.MapLike.default(MapLike.scala:235)
>   at scala.collection.MapLike.default$(MapLike.scala:234)
>   at scala.collection.AbstractMap.default(Map.scala:65)
>   at scala.collection.MapLike.apply(MapLike.scala:144)
>   at scala.collection.MapLike.apply$(MapLike.scala:143)
>   at scala.collection.AbstractMap.apply(Map.scala:65)
>   at kafka.server.AdminManager.listType$1(AdminManager.scala:681)
>   at 
> kafka.server.AdminManager.$anonfun$prepareIncrementalConfigs$1(AdminManager.scala:693)
>   at 
> kafka.server.AdminManager.prepareIncrementalConfigs(AdminManager.scala:687)
>   at 
> kafka.server.AdminManager.$anonfun$incrementalAlterConfigs$1(AdminManager.scala:618)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:273)
>   at scala.collection.immutable.Map$Map1.foreach(Map.scala:154)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:273)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:266)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:108)
>   at 
> kafka.server.AdminManager.incrementalAlterConfigs(AdminManager.scala:589)
>   at 
> kafka.server.KafkaApis.handleIncrementalAlterConfigsRequest(KafkaApis.scala:2698)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:188)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:78)
>   at java.base/java.lang.Thread.run(Thread.java:834)
> ```
> It looks like we are only allowing changes to the keys defined in 
> `KafkaConfig` through this API. This excludes config changes to any plugin 
> components such as `JmxReporter`. 
> Note that I was able to use the regular `alterConfig` API to change this 
> config.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10140) Incremental config api excludes plugin config changes

2020-12-16 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-10140:

Fix Version/s: (was: 2.7.0)

> Incremental config api excludes plugin config changes
> -
>
> Key: KAFKA-10140
> URL: https://issues.apache.org/jira/browse/KAFKA-10140
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Critical
>
> I was trying to alter the jmx metric filters using the incremental alter 
> config api and hit this error:
> ```
> java.util.NoSuchElementException: key not found: metrics.jmx.blacklist
>   at scala.collection.MapLike.default(MapLike.scala:235)
>   at scala.collection.MapLike.default$(MapLike.scala:234)
>   at scala.collection.AbstractMap.default(Map.scala:65)
>   at scala.collection.MapLike.apply(MapLike.scala:144)
>   at scala.collection.MapLike.apply$(MapLike.scala:143)
>   at scala.collection.AbstractMap.apply(Map.scala:65)
>   at kafka.server.AdminManager.listType$1(AdminManager.scala:681)
>   at 
> kafka.server.AdminManager.$anonfun$prepareIncrementalConfigs$1(AdminManager.scala:693)
>   at 
> kafka.server.AdminManager.prepareIncrementalConfigs(AdminManager.scala:687)
>   at 
> kafka.server.AdminManager.$anonfun$incrementalAlterConfigs$1(AdminManager.scala:618)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:273)
>   at scala.collection.immutable.Map$Map1.foreach(Map.scala:154)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:273)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:266)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:108)
>   at 
> kafka.server.AdminManager.incrementalAlterConfigs(AdminManager.scala:589)
>   at 
> kafka.server.KafkaApis.handleIncrementalAlterConfigsRequest(KafkaApis.scala:2698)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:188)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:78)
>   at java.base/java.lang.Thread.run(Thread.java:834)
> ```
> It looks like we are only allowing changes to the keys defined in 
> `KafkaConfig` through this API. This excludes config changes to any plugin 
> components such as `JmxReporter`. 
> Note that I was able to use the regular `alterConfig` API to change this 
> config.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei merged pull request #9708: KAFKA-9126: KIP-689: StreamJoined changelog configuration

2020-12-16 Thread GitBox


vvcephei merged pull request #9708:
URL: https://github.com/apache/kafka/pull/9708


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9695: KAFKA-10500: Remove thread

2020-12-16 Thread GitBox


ableegoldman commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r544645060



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -924,22 +924,64 @@ private StreamThread createStreamThread(final long 
cacheSizePerThread, final int
  * @return name of the added stream thread or empty if a new stream thread 
could not be added
  */
 public Optional addStreamThread() {
-synchronized (changeThreadCount) {
-if (isRunningOrRebalancing()) {
-final int threadIdx = getNextThreadIndex();
-final long cacheSizePerThread = 
getCacheSizePerThread(threads.size() + 1);
+if (isRunningOrRebalancing()) {
+final int threadIdx;
+final long cacheSizePerThread;
+synchronized (changeThreadCount) {
+threadIdx = getNextThreadIndex();
+cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
 resizeThreadCache(cacheSizePerThread);
-final StreamThread streamThread = 
createStreamThread(cacheSizePerThread, threadIdx);
-synchronized (stateLock) {
-if (isRunningOrRebalancing()) {
-streamThread.start();
-return Optional.of(streamThread.getName());
-} else {
-streamThread.shutdown();
+}
+final StreamThread streamThread = 
createStreamThread(cacheSizePerThread, threadIdx);
+
+synchronized (stateLock) {
+if (isRunningOrRebalancing()) {
+streamThread.start();
+return Optional.of(streamThread.getName());
+} else {
+streamThread.shutdown();
+threads.remove(streamThread);
+resizeThreadCache(getCacheSizePerThread(threads.size()));
+return Optional.empty();
+}
+}
+}
+return Optional.empty();
+}
+
+/**
+ * Removes one stream thread out of the running stream threads from this 
Kafka Streams client.
+ * 
+ * The removed stream thread is gracefully shut down. This method does not 
specify which stream
+ * thread is shut down.
+ * 
+ * Since the number of stream threads decreases, the sizes of the caches 
in the remaining stream
+ * threads are adapted so that the sum of the cache sizes over all stream 
threads equals the total
+ * cache size specified in configuration {@link 
StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ *
+ * @return name of the removed stream thread or empty if a stream thread 
could not be removed because
+ * no stream threads are alive
+ */
+public Optional removeStreamThread() {
+if (isRunningOrRebalancing()) {
+for (final StreamThread streamThread : threads) {

Review comment:
   It wouldn't be guaranteed to catch this, but either way I think we 
should have a test that starts up two threads which both try to 
`removeThread()` at the same time (and maybe similarly for `addStreamThread`)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9695: KAFKA-10500: Remove thread

2020-12-16 Thread GitBox


ableegoldman commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r544643577



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -924,22 +924,64 @@ private StreamThread createStreamThread(final long 
cacheSizePerThread, final int
  * @return name of the added stream thread or empty if a new stream thread 
could not be added
  */
 public Optional addStreamThread() {
-synchronized (changeThreadCount) {
-if (isRunningOrRebalancing()) {
-final int threadIdx = getNextThreadIndex();
-final long cacheSizePerThread = 
getCacheSizePerThread(threads.size() + 1);
+if (isRunningOrRebalancing()) {
+final int threadIdx;
+final long cacheSizePerThread;
+synchronized (changeThreadCount) {
+threadIdx = getNextThreadIndex();
+cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
 resizeThreadCache(cacheSizePerThread);
-final StreamThread streamThread = 
createStreamThread(cacheSizePerThread, threadIdx);
-synchronized (stateLock) {
-if (isRunningOrRebalancing()) {
-streamThread.start();
-return Optional.of(streamThread.getName());
-} else {
-streamThread.shutdown();
+}
+final StreamThread streamThread = 
createStreamThread(cacheSizePerThread, threadIdx);
+
+synchronized (stateLock) {
+if (isRunningOrRebalancing()) {
+streamThread.start();
+return Optional.of(streamThread.getName());
+} else {
+streamThread.shutdown();
+threads.remove(streamThread);
+resizeThreadCache(getCacheSizePerThread(threads.size()));
+return Optional.empty();
+}
+}
+}
+return Optional.empty();
+}
+
+/**
+ * Removes one stream thread out of the running stream threads from this 
Kafka Streams client.
+ * 
+ * The removed stream thread is gracefully shut down. This method does not 
specify which stream
+ * thread is shut down.
+ * 
+ * Since the number of stream threads decreases, the sizes of the caches 
in the remaining stream
+ * threads are adapted so that the sum of the cache sizes over all stream 
threads equals the total
+ * cache size specified in configuration {@link 
StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ *
+ * @return name of the removed stream thread or empty if a stream thread 
could not be removed because
+ * no stream threads are alive
+ */
+public Optional removeStreamThread() {
+if (isRunningOrRebalancing()) {
+for (final StreamThread streamThread : threads) {

Review comment:
   Ah I missed the previous PR where you changed the list type. Ok I'm  not 
that familiar with `Collections.synchronizedList` but I'm still worried we may 
not be safe with this. From the 
[javadocs](https://docs.oracle.com/javase/7/docs/api/java/util/Collections.html#synchronizedList(java.util.List)):
   
   ```
   It is imperative that the user manually synchronize on the returned list 
when iterating over it:
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9626: KAFKA-10545: Create topic IDs and propagate to brokers

2020-12-16 Thread GitBox


jolshan commented on a change in pull request #9626:
URL: https://github.com/apache/kafka/pull/9626#discussion_r544634608



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -1434,6 +1447,31 @@ class ReplicaManager(val config: KafkaConfig,
*/
 if (localLog(topicPartition).isEmpty)
   markPartitionOffline(topicPartition)
+else {
+  val id = topicIds.get(topicPartition.topic())
+  // Ensure we have not received a request from an older protocol
+  if (id != null && !id.equals(Uuid.ZERO_UUID)) {
+val log = localLog(topicPartition).get
+// Check if the topic ID is in memory, if not, it must be new 
to the broker.
+// If the broker previously wrote it to file, it would be 
recovered on restart after failure.
+// If the topic ID is not the default (ZERO_UUID), a topic ID 
is being used for the given topic.
+// If the topic ID in the log does not match the one in the 
request, the broker's topic must be stale.
+if (!log.topicId.equals(Uuid.ZERO_UUID) && 
!log.topicId.equals(topicIds.get(topicPartition.topic))) {
+  stateChangeLogger.warn(s"Topic Id in memory: 
${log.topicId.toString} does not" +
+s" match the topic Id provided in the request: " +
+s"${topicIds.get(topicPartition.topic).toString}.")
+} else {
+  // There is not yet a topic ID stored in the log.
+  // Write the partition metadata file if it is empty.
+  if (log.partitionMetadataFile.get.isEmpty()) {
+
log.partitionMetadataFile.get.write(topicIds.get(topicPartition.topic))
+log.topicId = topicIds.get(topicPartition.topic)
+  } else {
+stateChangeLogger.warn("Partition metadata file already 
contains content.")

Review comment:
   So I thought through these cases some more and realized that the 
metadata file will fail to open if formatted incorrectly. So the only case 
where there could be data written to the file is if the ID is the zero UUID. So 
I decided to just fail on reading the file if the zero ID is provided. (We will 
never write zero ID to file.) The rest of this cleaned up pretty nicely.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9676: KAFKA-10778: Fence appends after write failure

2020-12-16 Thread GitBox


hachikuji commented on a change in pull request #9676:
URL: https://github.com/apache/kafka/pull/9676#discussion_r544611285



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -1556,6 +1572,8 @@ class Log(@volatile private var _dir: File,
   done = fetchDataInfo != null || segmentEntry == null
 }
 
+checkForLogDirFailure()

Review comment:
   Seems more intuitive to move this check before the segment read. I don't 
think we can totally avoid race conditions with a failure in append since we 
don't have the lock here. Perhaps we could even move this check to 
`maybeHandleIOException` so that we handle all cases? 

##
File path: core/src/test/scala/unit/kafka/log/LogTest.scala
##
@@ -2818,6 +2818,22 @@ class LogTest {
   new SimpleRecord(RecordBatch.NO_TIMESTAMP, "key".getBytes, 
"value".getBytes)), leaderEpoch = 0)
   }
 
+  @Test
+  def testAppendToOrReadFromLogInFailedLogDir(): Unit = {
+val log = createLog(logDir, LogConfig())
+log.appendAsLeader(TestUtils.singletonRecords(value = null), leaderEpoch = 
0)
+assertEquals(0, readLog(log, 0, 
4096).records.records.iterator.next().offset)
+try {
+  log.maybeHandleIOException("Simulating failed log dir") {

Review comment:
   Another way to trigger an IO exception is to rename the log file. This 
trick is used in `testAppendToTransactionIndexFailure`. Then we don't need to 
expose `maybeHandleIOException` for testing.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets

2020-12-16 Thread GitBox


hachikuji commented on a change in pull request #9590:
URL: https://github.com/apache/kafka/pull/9590#discussion_r544593141



##
File path: 
clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
##
@@ -198,7 +198,7 @@ private static FilterResult filterTo(TopicPartition 
partition, Iterable

[GitHub] [kafka] hachikuji commented on a change in pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets

2020-12-16 Thread GitBox


hachikuji commented on a change in pull request #9590:
URL: https://github.com/apache/kafka/pull/9590#discussion_r544593141



##
File path: 
clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
##
@@ -198,7 +198,7 @@ private static FilterResult filterTo(TopicPartition 
partition, Iterable

[jira] [Commented] (KAFKA-10853) Replication protocol deficiencies with workloads requiring high durability guarantees

2020-12-16 Thread Kyle Ambroff-Kao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17250598#comment-17250598
 ] 

Kyle Ambroff-Kao commented on KAFKA-10853:
--

Thanks for the reply Jun!
{quote}As for your proposal of using largestAckedOffset, the challenge is that 
we allow producers with different ack mode even on the same topic.
{quote}
Hrm, yeah that's a great point. I think for our internal use case the 
largestAckedOffset change would be OK since a producer that doesn't use acks: 
all would be a configuration bug. But it's true that one mis-configured client 
would ruin it for everyone.

I guess my proposal would have to either require some strict ordering of 
acknowledgements to producers, blocking acknowledgment of a acks=1 
producerequest until a acks=all request that came before it completes. That's 
obviously terrible.

Or I guess it could be done with a new topic configuration that put the topic 
into durable mode, enforcing acks=all reguardless of what the ProduceRequest 
says. That sounds less terrible.
{quote}It sounds like you already have a separate mechanism to detect the 
slowness of a broker based on request.max.local.time.ms. So, another approach 
could be communicating this info to the controller so that it can choose to 
move the leader off the slow broker. It seems that this could solve the problem 
if request.max.local.time.ms < replica.lag.time.max.ms.
{quote}
Hrm, that's an interesting proposal. I like this idea. We could change our code 
to, instead of halt if request.max.local.time.ms is exceeded, just stop 
accepting client traffic (except for follower fetch requests), and signal to 
the controller that leadership should be shifted elsewhere. That sounds really 
nice.

I'm nervous about the idea of changing out replica.lag.time.max.ms though. We 
have had that set to 10 seconds for many years now and I'm nervous about 
increasing it. 10 seconds already feels like it is too long.

And decreasing replica.lag.time.max.ms seems scary too because we don't want 
false positives. We've set it to 60 seconds through experience and trial and 
error, since we generally don't see GC pauses that long, but we do see IOPS 
pause for that long when we have a real storage issue.

The other problem is that I think handling failures this way only works for 
certain failure modes, like a really slow or failing disk. It doesn't handle 
network partitions at all, since we may not be able to reach the controller. Or 
the pause may be caused by something else that prevents any thread from being 
scheduled.

I need to think about this for a bit.

> Replication protocol deficiencies with workloads requiring high durability 
> guarantees
> -
>
> Key: KAFKA-10853
> URL: https://issues.apache.org/jira/browse/KAFKA-10853
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0
>Reporter: Kyle Ambroff-Kao
>Priority: Major
>
> *tl;dr: The definition of ISR and the consistency model from the perspective 
> of the producer seem a bit out of sync*
> We have many systems in production that trade off availability in order to 
> provide stronger consistency guarantees. Most of these configurations look 
> like this:
> Topic configuration:
>  * replication factor 3
>  * min.insync.replicas=2
>  * unclean.leader.election.enable=false
> Producer configuration:
>  * acks=all
> Broker configuration:
>  * replica.lag.time.max.ms=1
> So the goal here is to reduce the chance of ever dropping a message that the 
> leader has acknowledged to the producer.
> This works great, except that we've found some situations in production where 
> we are forced to enable unclean leader election to recover, which we never 
> want to do. These situations all seem totally avoidable with some small 
> tweaks to the replication protocol.
> *A scenario we've seen many times*
> The following sequence of events are in time order: A replica set for a 
> topic-partition TP with leader L and replicas R1 and R2. All three replicas 
> are in ISR.
>  # Producer sends ProduceRequest R with acks=all that contains a message 
> batch to the leader L.
>  # L receives R and appends the batch it contains to the active segment of TP 
> but does not ack to the producer yet because the request was acks=all
>  # A storage fault occurs on L which makes all IOPS take a long time but 
> doesn't cause a hard failure.
>  # R1 and R2 send follower fetch requests to L which are infinitely delayed 
> due to the storage fault on L.
>  # 10 seconds after appending the batch and appending it to the log, L 
> shrinks the ISR, removing R1 and R2. This is because ISR is defined as at 
> most replica.lag.time.max.ms milliseconds behind the log append time of the 
> leader end offset. The leader end offset is a message 

[GitHub] [kafka] hachikuji commented on a change in pull request #9713: KAFKA-10825 ZK ISR manager

2020-12-16 Thread GitBox


hachikuji commented on a change in pull request #9713:
URL: https://github.com/apache/kafka/pull/9713#discussion_r544581576



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -1374,47 +1314,28 @@ class Partition(val topicPartition: TopicPartition,
 }
   }
 
-  private def shrinkIsrWithZk(newIsr: Set[Int]): Unit = {
-val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
newIsr.toList, zkVersion)
-val zkVersionOpt = stateStore.shrinkIsr(controllerEpoch, newLeaderAndIsr)
-if (zkVersionOpt.isDefined) {
-  isrChangeListener.markShrink()
-}
-maybeUpdateIsrAndVersionWithZk(newIsr, zkVersionOpt)
-  }
-
-  private def maybeUpdateIsrAndVersionWithZk(isr: Set[Int], zkVersionOpt: 
Option[Int]): Unit = {
-zkVersionOpt match {
-  case Some(newVersion) =>
-isrState = CommittedIsr(isr)
-zkVersion = newVersion
-info("ISR updated to [%s] and zkVersion updated to 
[%d]".format(isr.mkString(","), zkVersion))
-
-  case None =>
-info(s"Cached zkVersion $zkVersion not equal to that in zookeeper, 
skip updating ISR")
-isrChangeListener.markFailed()
-}
-  }
-
   private def sendAlterIsrRequest(proposedIsrState: IsrState): Unit = {
 val isrToSend: Set[Int] = proposedIsrState match {
   case PendingExpandIsr(isr, newInSyncReplicaId) => isr + 
newInSyncReplicaId
   case PendingShrinkIsr(isr, outOfSyncReplicaIds) => isr -- 
outOfSyncReplicaIds
   case state =>
+isrChangeListener.markFailed()
 throw new IllegalStateException(s"Invalid state $state for `AlterIsr` 
request for partition $topicPartition")
 }
 
 val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
isrToSend.toList, zkVersion)
-val alterIsrItem = AlterIsrItem(topicPartition, newLeaderAndIsr, 
handleAlterIsrResponse(proposedIsrState))
+val alterIsrItem = AlterIsrItem(topicPartition, newLeaderAndIsr, 
handleAlterIsrResponse(proposedIsrState), controllerEpoch)
 
-if (!alterIsrManager.enqueue(alterIsrItem)) {
-  isrChangeListener.markFailed()
-  throw new IllegalStateException(s"Failed to enqueue `AlterIsr` request 
with state " +
-s"$newLeaderAndIsr for partition $topicPartition")
-}
-
-isrState = proposedIsrState
-debug(s"Sent `AlterIsr` request to change state to $newLeaderAndIsr after 
transition to $proposedIsrState")
+alterIsrManager.submit(alterIsrItem, (wasSubmitted: Boolean) => {
+  if (wasSubmitted) {
+isrState = proposedIsrState

Review comment:
   Could we solve the problem by moving this above the call to `submit`? 
Then the callback logic should work even if the change is made synchronously. 
In the case that the request fails to be submitted, then we can reset to the 
previous state. What do you think?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10853) Replication protocol deficiencies with workloads requiring high durability guarantees

2020-12-16 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17250568#comment-17250568
 ] 

Jun Rao commented on KAFKA-10853:
-

[~ambroff] : Thanks for reporting this. I agree that this is real problem. The 
mechanism for replica.lag.time.max.ms works well when the slowness is in the 
followers, but not as well when the slowness is in the leader. 

As for your proposal of using largestAckedOffset, the challenge is that we 
allow producers with different ack mode even on the same topic.

It sounds like you already have a separate mechanism to detect the slowness of 
a broker based on request.max.local.time.ms. So, another approach could be 
communicating this info to the controller so that it can choose to move the 
leader off the slow broker. It seems that this could solve the problem if 
request.max.local.time.ms < replica.lag.time.max.ms.

> Replication protocol deficiencies with workloads requiring high durability 
> guarantees
> -
>
> Key: KAFKA-10853
> URL: https://issues.apache.org/jira/browse/KAFKA-10853
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0
>Reporter: Kyle Ambroff-Kao
>Priority: Major
>
> *tl;dr: The definition of ISR and the consistency model from the perspective 
> of the producer seem a bit out of sync*
> We have many systems in production that trade off availability in order to 
> provide stronger consistency guarantees. Most of these configurations look 
> like this:
> Topic configuration:
>  * replication factor 3
>  * min.insync.replicas=2
>  * unclean.leader.election.enable=false
> Producer configuration:
>  * acks=all
> Broker configuration:
>  * replica.lag.time.max.ms=1
> So the goal here is to reduce the chance of ever dropping a message that the 
> leader has acknowledged to the producer.
> This works great, except that we've found some situations in production where 
> we are forced to enable unclean leader election to recover, which we never 
> want to do. These situations all seem totally avoidable with some small 
> tweaks to the replication protocol.
> *A scenario we've seen many times*
> The following sequence of events are in time order: A replica set for a 
> topic-partition TP with leader L and replicas R1 and R2. All three replicas 
> are in ISR.
>  # Producer sends ProduceRequest R with acks=all that contains a message 
> batch to the leader L.
>  # L receives R and appends the batch it contains to the active segment of TP 
> but does not ack to the producer yet because the request was acks=all
>  # A storage fault occurs on L which makes all IOPS take a long time but 
> doesn't cause a hard failure.
>  # R1 and R2 send follower fetch requests to L which are infinitely delayed 
> due to the storage fault on L.
>  # 10 seconds after appending the batch and appending it to the log, L 
> shrinks the ISR, removing R1 and R2. This is because ISR is defined as at 
> most replica.lag.time.max.ms milliseconds behind the log append time of the 
> leader end offset. The leader end offset is a message that has not been 
> replicated yet.
> The storage fault example in step 3 could easily be another kind of fault. 
> Say for example, L is partitioned from R1 and R2 but not from ZooKeeper or 
> the producer.
> The producer never receives acknowledgement of the ProduceRequest because the 
> min.insync.replicas constraint was never satisfied. So in terms of data 
> consistency, everything is working fine.
> The problem is recovering from this situation. If the fault on L is not a 
> temporary blip, then L needs to be replaced. But since L shrunk the ISR, the 
> only way that leadership can move to either R1 or R2 is to set 
> unclean.leader.election.enable=true.
> This works but it is a potentially unsafe way to recover and move leadership. 
> It would be better to have other options.
> *Recovery could be automatic in this scenario.*
> If you think about it, from the perspective of the producer, the write was 
> not acknowledged, and therefore, L, R1 and R2 are actually in-sync. So it 
> should actually be totally safe for leadership to transition to either R1 or 
> R2.
> It seems that the producer and the leader don't have fully compatible 
> definitions for what it means for the replica set to be in-sync. If the 
> leader L used different rules for defining ISR, it could allow self-healing 
> in this or similar scenarios, since the ISR would not shrink.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10860) JmxTool fails with NPE when object-name contains a wildcard

2020-12-16 Thread Bob Barrett (Jira)
Bob Barrett created KAFKA-10860:
---

 Summary: JmxTool fails with NPE when object-name contains a 
wildcard
 Key: KAFKA-10860
 URL: https://issues.apache.org/jira/browse/KAFKA-10860
 Project: Kafka
  Issue Type: Bug
Reporter: Bob Barrett


When running JmxTool with a wildcard in the object name, the tool fails with a 
NullPointerException:
{code:java}
bin/kafka-run-class kafka.tools.JmxTool --jmx-url 
service:jmx:rmi:///jndi/rmi://localhost:/jmxrmi --object-name 
kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=*
Trying to connect to JMX url: 
service:jmx:rmi:///jndi/rmi://localhost:/jmxrmi.
Exception in thread "main" java.lang.NullPointerException at 
kafka.tools.JmxTool$.main(JmxTool.scala:194) at 
kafka.tools.JmxTool.main(JmxTool.scala)
{code}
It seems that we never populate the `names` variable when the object name 
includes a pattern:
{code:java}
var names: Iterable[ObjectName] = null
def namesSet = Option(names).toSet.flatten
def foundAllObjects = queries.toSet == namesSet
val waitTimeoutMs = 1
if (!hasPatternQueries) {
  val start = System.currentTimeMillis
  do {
if (names != null) {
  System.err.println("Could not find all object names, retrying")
  Thread.sleep(100)
}
names = queries.flatMap((name: ObjectName) => mbsc.queryNames(name, 
null).asScala)
  } while (wait && System.currentTimeMillis - start < waitTimeoutMs && 
!foundAllObjects)
}

if (wait && !foundAllObjects) {
  val missing = (queries.toSet - namesSet).mkString(", ")
  System.err.println(s"Could not find all requested object names after 
$waitTimeoutMs ms. Missing $missing")
  System.err.println("Exiting.")
  sys.exit(1)
}

val numExpectedAttributes: Map[ObjectName, Int] =
  if (!attributesWhitelistExists)
names.map{name: ObjectName =>
  val mbean = mbsc.getMBeanInfo(name)
  (name, mbsc.getAttributes(name, 
mbean.getAttributes.map(_.getName)).size)}.toMap
  else {
if (!hasPatternQueries)
  names.map{name: ObjectName =>
val mbean = mbsc.getMBeanInfo(name)
val attributes = mbsc.getAttributes(name, 
mbean.getAttributes.map(_.getName))
val expectedAttributes = 
attributes.asScala.asInstanceOf[mutable.Buffer[Attribute]]
  .filter(attr => attributesWhitelist.get.contains(attr.getName))
(name, expectedAttributes.size)}.toMap.filter(_._2 > 0)
else
  queries.map((_, attributesWhitelist.get.length)).toMap
  }
{code}
We need to add logic to query the object names that match the pattern when a 
pattern is part of the input.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] scott-hendricks commented on a change in pull request #9736: Add configurable workloads and E2E latency tracking to Trogdor.

2020-12-16 Thread GitBox


scott-hendricks commented on a change in pull request #9736:
URL: https://github.com/apache/kafka/pull/9736#discussion_r544547465



##
File path: 
tools/src/main/java/org/apache/kafka/trogdor/workload/GaussianTimestampRandomPayloadGenerator.java
##
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Random;
+
+/**
+ * This class behaves identically to TimestampRandomPayloadGenerator, except 
the message size follows a gaussian
+ * distribution.
+ *
+ * This should be used in conjunction with TimestampRecordProcessor in the 
Consumer to measure true end-to-end latency
+ * of a system.
+ *
+ * `messageSizeAverage` - The average size in bytes of each message.
+ * `messageSizeDeviation` - The standard deviation to use when calculating 
message size.
+ * `timestampBytes` - The amount of bytes to use for the timestamp.  Usually 8.
+ * `messagesUntilSizeChange` - The number of messages to keep at the same size.
+ * `seed` - Used to initialize Random() to remove some non-determinism.
+ *
+ * Here is an example spec:
+ *
+ * {
+ *"type": "gaussianTimestampRandom",
+ *"messageSizeAverage": 512,
+ *"messageSizeDeviation": 100,
+ *"timestampBytes": 8,
+ *"messagesUntilSizeChange": 100
+ * }
+ *
+ * This will generate messages on a gaussian distribution with an average size 
each 512-bytes and the first 8 bytes
+ * encoded with the timestamp.  The message sizes will have a standard 
deviation of 100 bytes, and the size will only
+ * change every 100 messages.  The distribution of messages will be as follows:
+ *
+ *The average size of the messages are 512 bytes.
+ *~68% of the messages are between 412 and 612 bytes
+ *~95% of the messages are between 312 and 712 bytes
+ *~99% of the messages are between 212 and 812 bytes
+ */
+
+public class GaussianTimestampRandomPayloadGenerator implements 
PayloadGenerator {
+private final int messageSizeAverage;
+private final int messageSizeDeviation;
+private final int timestampBytes;
+private final int messagesUntilSizeChange;
+private final long seed;
+
+private final Random random = new Random();
+private final ByteBuffer buffer;
+
+private int messageTracker = 0;
+private int messageSize = 0;
+
+@JsonCreator
+public 
GaussianTimestampRandomPayloadGenerator(@JsonProperty("messageSizeAverage") int 
messageSizeAverage,
+   
@JsonProperty("messageSizeDeviation") int messageSizeDeviation,
+   
@JsonProperty("timestampBytes") int timestampBytes,

Review comment:
   Removed timestampBytes.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-16 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r544547081



##
File path: 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java
##
@@ -116,38 +122,88 @@ private void setFieldValue(Object obj, String fieldName, 
Object value) throws Ex
 Set entries = aclEntries.computeIfAbsent(resource, k -> 
new HashSet<>());
 
 for (int aclId = 0; aclId < aclCount; aclId++) {
-AccessControlEntry ace = new 
AccessControlEntry(principal.toString() + aclId,
-"*", AclOperation.READ, AclPermissionType.ALLOW);
-entries.add(new AclEntry(ace));
+// The principle in the request context we are using

Review comment:
   commit ec80dc4e55758d83835f3ecde381a988d6dd4779





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-16 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r544546946



##
File path: 
core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTestFactory.scala
##
@@ -0,0 +1,321 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.security.authorizer
+
+import java.net.InetAddress
+import java.util.UUID
+
+import kafka.security.authorizer.AclEntry.{WildcardHost, 
WildcardPrincipalString}
+import org.apache.kafka.common.acl.AclOperation.{ALL, READ, WRITE}
+import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY}
+import org.apache.kafka.common.acl.{AccessControlEntry, AclOperation}
+import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.requests.RequestContext
+import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED}
+import org.apache.kafka.common.resource.ResourcePattern.WILDCARD_RESOURCE
+import org.apache.kafka.common.resource.ResourceType.{CLUSTER, GROUP, TOPIC, 
TRANSACTIONAL_ID}
+import org.apache.kafka.common.resource.{ResourcePattern, ResourceType}
+import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.apache.kafka.server.authorizer.Authorizer
+import org.junit.Assert.{assertFalse, assertTrue}
+
+class AuthorizerTestFactory(val newRequestContext3: (KafkaPrincipal, 
InetAddress, ApiKeys) => RequestContext,
+val addAcls: (Authorizer, Set[AccessControlEntry], 
ResourcePattern) => Unit,
+val authorizeByResourceType: (Authorizer, 
RequestContext, AclOperation, ResourceType) => Boolean,
+val removeAcls: (Authorizer, 
Set[AccessControlEntry], ResourcePattern) => Unit) {
+  def newRequestContext(kafkaPrincipal: KafkaPrincipal, inetAddress: 
InetAddress): RequestContext =
+newRequestContext3(kafkaPrincipal, inetAddress, ApiKeys.PRODUCE)
+
+  def testAuthorizeByResourceTypeMultipleAddAndRemove(authorizer: Authorizer): 
Unit = {
+val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user1")
+val host1 = InetAddress.getByName("192.168.1.1")
+val resource1 = new ResourcePattern(TOPIC, "sb1" + UUID.randomUUID(), 
LITERAL)
+val denyRead = new AccessControlEntry(user1.toString, 
host1.getHostAddress, READ, DENY)
+val allowRead = new AccessControlEntry(user1.toString, 
host1.getHostAddress, READ, ALLOW)
+val u1h1Context = newRequestContext(user1, host1)
+
+for (_ <- 1 to 10) {
+  assertFalse("User1 from host1 should not have READ access to any topic 
when no ACL exists",
+authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC))
+
+  addAcls(authorizer, Set(allowRead), resource1)
+  assertTrue("User1 from host1 now should have READ access to at least one 
topic",
+authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC))
+
+  for (_ <- 1 to 10) {
+addAcls(authorizer, Set(denyRead), resource1)
+assertFalse("User1 from host1 now should not have READ access to any 
topic",
+  authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC))
+
+removeAcls(authorizer, Set(denyRead), resource1)
+addAcls(authorizer, Set(allowRead), resource1)
+assertTrue("User1 from host1 now should have READ access to at least 
one topic",
+  authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC))
+  }
+
+  removeAcls(authorizer, Set(allowRead), resource1)
+  assertFalse("User1 from host1 now should not have READ access to any 
topic",
+authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC))
+}
+  }
+
+   def 
testAuthorizeByResourceTypeIsolationUnrelatedDenyWontDominateAllow(authorizer: 
Authorizer): Unit = {
+val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user1")
+val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user2")
+val host1 = InetAddress.getByName("192.168.1.1")
+val host2 = InetAddress.getByName("192.168.1.2")
+val resource1 = new ResourcePattern(TOPIC, "sb1" + UUID.randomUUID(), 
LITERAL)
+val resource2 = new ResourcePattern(TOPIC, "sb2" + 

[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-16 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r544545938



##
File path: 
clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
##
@@ -139,4 +152,134 @@
  * @return Iterator for ACL bindings, which may be populated lazily.
  */
 Iterable acls(AclBindingFilter filter);
+
+/**
+ * Check if the caller is authorized to perform the given ACL operation on 
at least one
+ * resource of the given type.
+ *
+ * It is important to override this interface default in implementations 
because
+ * 1. The interface default iterates all AclBindings multiple times, 
without any indexing,
+ *which is a CPU intense work.
+ * 2. The interface default rebuild several sets of strings, which is a 
memory intense work.
+ * 3. The interface default cannot perform the audit logging properly
+ *
+ * @param requestContext Request context including request resourceType, 
security protocol, and listener name
+ * @param op The ACL operation to check
+ * @param resourceType   The resource type to check
+ * @return   Return {@link AuthorizationResult#ALLOWED} if the 
caller is authorized to perform the
+ *   given ACL operation on at least one resource of 
the given type.
+ *   Return {@link AuthorizationResult#DENIED} 
otherwise.
+ */
+default AuthorizationResult 
authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation 
op, ResourceType resourceType) {
+SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType);
+
+if (authorize(requestContext, Collections.singletonList(new Action(
+AclOperation.READ,

Review comment:
   commit ec80dc4e55758d83835f3ecde381a988d6dd4779

##
File path: 
clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
##
@@ -139,4 +152,134 @@
  * @return Iterator for ACL bindings, which may be populated lazily.
  */
 Iterable acls(AclBindingFilter filter);
+
+/**
+ * Check if the caller is authorized to perform the given ACL operation on 
at least one
+ * resource of the given type.
+ *
+ * It is important to override this interface default in implementations 
because
+ * 1. The interface default iterates all AclBindings multiple times, 
without any indexing,
+ *which is a CPU intense work.
+ * 2. The interface default rebuild several sets of strings, which is a 
memory intense work.
+ * 3. The interface default cannot perform the audit logging properly
+ *
+ * @param requestContext Request context including request resourceType, 
security protocol, and listener name
+ * @param op The ACL operation to check
+ * @param resourceType   The resource type to check
+ * @return   Return {@link AuthorizationResult#ALLOWED} if the 
caller is authorized to perform the
+ *   given ACL operation on at least one resource of 
the given type.
+ *   Return {@link AuthorizationResult#DENIED} 
otherwise.
+ */
+default AuthorizationResult 
authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation 
op, ResourceType resourceType) {
+SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType);
+
+if (authorize(requestContext, Collections.singletonList(new Action(
+AclOperation.READ,
+new ResourcePattern(resourceType, "hardcode", 
PatternType.LITERAL),
+0, false, false)))

Review comment:
   commit ec80dc4e55758d83835f3ecde381a988d6dd4779

##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -304,6 +310,137 @@ class AclAuthorizer extends Authorizer with Logging {
 if (zkClient != null) zkClient.close()
   }
 
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+   op: AclOperation,
+   resourceType: ResourceType): 
AuthorizationResult = {
+SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+val principal = new KafkaPrincipal(
+  requestContext.principal().getPrincipalType,
+  requestContext.principal().getName)
+
+if (isSuperUser(principal))
+  return AuthorizationResult.ALLOWED
+
+val principalStr = principal.toString
+
+val host = requestContext.clientAddress().getHostAddress
+val action = new Action(op, new ResourcePattern(resourceType, "NONE", 
PatternType.UNKNOWN), 0, true, true)
+
+val denyLiterals = matchingResources(
+  principalStr, host, op, AclPermissionType.DENY, resourceType, 
PatternType.LITERAL)
+
+if (denyAll(denyLiterals)) {
+  logAuditMessage(requestContext, action, false)

Review comment:
   commit 

[GitHub] [kafka] rajinisivaram commented on a change in pull request #9622: KAFKA-10547; add topicId in MetadataResp

2020-12-16 Thread GitBox


rajinisivaram commented on a change in pull request #9622:
URL: https://github.com/apache/kafka/pull/9622#discussion_r544515142



##
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##
@@ -314,9 +315,16 @@ class MetadataCache(brokerId: Int) extends Logging {
   error(s"Listeners are not identical across brokers: $aliveNodes")
   }
 
+  val newTopicIds = updateMetadataRequest.topicStates().asScala
+.map(topicState => (topicState.topicName(), topicState.topicId()))
+.filter(_._2 != Uuid.ZERO_UUID).toMap
+  val topicIds = mutable.Map.empty[String, Uuid]
+  topicIds.addAll(metadataSnapshot.topicIds)
+  topicIds.addAll(newTopicIds)

Review comment:
   Yes, that was my suggestion too, but my wording wasn't right. I will 
reword that to avoid confusion.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9626: KAFKA-10545: Create topic IDs and propagate to brokers

2020-12-16 Thread GitBox


jolshan commented on a change in pull request #9626:
URL: https://github.com/apache/kafka/pull/9626#discussion_r544483664



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -1434,6 +1447,31 @@ class ReplicaManager(val config: KafkaConfig,
*/
 if (localLog(topicPartition).isEmpty)
   markPartitionOffline(topicPartition)
+else {
+  val id = topicIds.get(topicPartition.topic())
+  // Ensure we have not received a request from an older protocol
+  if (id != null && !id.equals(Uuid.ZERO_UUID)) {
+val log = localLog(topicPartition).get
+// Check if the topic ID is in memory, if not, it must be new 
to the broker.
+// If the broker previously wrote it to file, it would be 
recovered on restart after failure.
+// If the topic ID is not the default (ZERO_UUID), a topic ID 
is being used for the given topic.
+// If the topic ID in the log does not match the one in the 
request, the broker's topic must be stale.
+if (!log.topicId.equals(Uuid.ZERO_UUID) && 
!log.topicId.equals(topicIds.get(topicPartition.topic))) {
+  stateChangeLogger.warn(s"Topic Id in memory: 
${log.topicId.toString} does not" +
+s" match the topic Id provided in the request: " +
+s"${topicIds.get(topicPartition.topic).toString}.")
+} else {
+  // There is not yet a topic ID stored in the log.
+  // Write the partition metadata file if it is empty.
+  if (log.partitionMetadataFile.get.isEmpty()) {
+
log.partitionMetadataFile.get.write(topicIds.get(topicPartition.topic))
+log.topicId = topicIds.get(topicPartition.topic)
+  } else {
+stateChangeLogger.warn("Partition metadata file already 
contains content.")

Review comment:
   I might think about making this code cleaner in general to avoid so many 
nested if statements





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9626: KAFKA-10545: Create topic IDs and propagate to brokers

2020-12-16 Thread GitBox


jolshan commented on a change in pull request #9626:
URL: https://github.com/apache/kafka/pull/9626#discussion_r544483238



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -1434,6 +1447,31 @@ class ReplicaManager(val config: KafkaConfig,
*/
 if (localLog(topicPartition).isEmpty)
   markPartitionOffline(topicPartition)
+else {
+  val id = topicIds.get(topicPartition.topic())
+  // Ensure we have not received a request from an older protocol
+  if (id != null && !id.equals(Uuid.ZERO_UUID)) {
+val log = localLog(topicPartition).get
+// Check if the topic ID is in memory, if not, it must be new 
to the broker.
+// If the broker previously wrote it to file, it would be 
recovered on restart after failure.
+// If the topic ID is not the default (ZERO_UUID), a topic ID 
is being used for the given topic.
+// If the topic ID in the log does not match the one in the 
request, the broker's topic must be stale.
+if (!log.topicId.equals(Uuid.ZERO_UUID) && 
!log.topicId.equals(topicIds.get(topicPartition.topic))) {
+  stateChangeLogger.warn(s"Topic Id in memory: 
${log.topicId.toString} does not" +
+s" match the topic Id provided in the request: " +
+s"${topicIds.get(topicPartition.topic).toString}.")
+} else {
+  // There is not yet a topic ID stored in the log.
+  // Write the partition metadata file if it is empty.
+  if (log.partitionMetadataFile.get.isEmpty()) {
+
log.partitionMetadataFile.get.write(topicIds.get(topicPartition.topic))
+log.topicId = topicIds.get(topicPartition.topic)
+  } else {
+stateChangeLogger.warn("Partition metadata file already 
contains content.")

Review comment:
   Oops. I think I cleaned up this block and deleted something. There 
should be a check if log.topicId.equals(id). If so, then the file exists and we 
shouldn't go in to the block that says "// There is not yet a topic ID stored 
in the log."
   
   I should also fix the topicIds.get(topicPartition.topic) above and replace 
with id.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9622: KAFKA-10547; add topicId in MetadataResp

2020-12-16 Thread GitBox


jolshan commented on a change in pull request #9622:
URL: https://github.com/apache/kafka/pull/9622#discussion_r544479887



##
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##
@@ -314,9 +315,16 @@ class MetadataCache(brokerId: Int) extends Logging {
   error(s"Listeners are not identical across brokers: $aliveNodes")
   }
 
+  val newTopicIds = updateMetadataRequest.topicStates().asScala
+.map(topicState => (topicState.topicName(), topicState.topicId()))
+.filter(_._2 != Uuid.ZERO_UUID).toMap
+  val topicIds = mutable.Map.empty[String, Uuid]
+  topicIds.addAll(metadataSnapshot.topicIds)
+  topicIds.addAll(newTopicIds)

Review comment:
   So it seems like there is some logic to remove the partition states of 
deleted topics from the MetadataSnapshot. Would we want to do something similar 
there but with topic Ids? Apologies if I'm missing something





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9622: KAFKA-10547; add topicId in MetadataResp

2020-12-16 Thread GitBox


jolshan commented on a change in pull request #9622:
URL: https://github.com/apache/kafka/pull/9622#discussion_r544479887



##
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##
@@ -314,9 +315,16 @@ class MetadataCache(brokerId: Int) extends Logging {
   error(s"Listeners are not identical across brokers: $aliveNodes")
   }
 
+  val newTopicIds = updateMetadataRequest.topicStates().asScala
+.map(topicState => (topicState.topicName(), topicState.topicId()))
+.filter(_._2 != Uuid.ZERO_UUID).toMap
+  val topicIds = mutable.Map.empty[String, Uuid]
+  topicIds.addAll(metadataSnapshot.topicIds)
+  topicIds.addAll(newTopicIds)

Review comment:
   So it seems like there is some logic to remove the partition states of 
deleted topics from the MetadataSnapshot. Would we want to do something similar 
there? Apologies if I'm missing something





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bertber opened a new pull request #9761: KAFKA-10768 Add a test for ByteBufferInputStream to ByteBufferLogInputStreamTest

2020-12-16 Thread GitBox


bertber opened a new pull request #9761:
URL: https://github.com/apache/kafka/pull/9761


   I made a test for ByteBufferInputStream in the ByteBufferLogInputStreamTest.
   First, I add a ByteBuffer that it's not empty to the ByteBufferInputStream, 
in order to verify it.
   After that, I try to use ByteBufferInputStream's read function and check 
return value whether it's correct.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10759) ARM support for Kafka

2020-12-16 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10759?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17250438#comment-17250438
 ] 

Jun Rao commented on KAFKA-10759:
-

[~xiaopenglei]: Thanks for the jira. Perhaps we can file an Apache infra ticket 
to see if ASF jenkins supports or plans to support ARM based servers.

> ARM support for Kafka
> -
>
> Key: KAFKA-10759
> URL: https://issues.apache.org/jira/browse/KAFKA-10759
> Project: Kafka
>  Issue Type: Improvement
>  Components: build
>Reporter: PengLei
>Priority: Major
> Attachments: build_output.log, run_test_output.log
>
>
> ARM support for Kafka.
> I tried to deploy the Kafka cluster on the ARM server, but unfortunately I 
> did not find the official ARM  release for Kafka. I think more and more 
> people will try the same thing as I do.
> Now the CI of kafka (in github) is handled by jenkins-ci. While the test is 
> running under x86 ARCH, the arm ARCH is missing. This leads an problem that 
> we don't have a way to test every pull request that if it'll break the kafka 
> deployment on arm or not. Similarly, we cannot provide the ARM release 
> package without the ARM CI.
> If Apache Kafka community has interested with it, I can help for the 
> integration.
> This is the umbrella issue to track the efforts to make Kafka run on ARM 
> processors.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jolshan commented on pull request #9626: KAFKA-10545: Create topic IDs and propagate to brokers

2020-12-16 Thread GitBox


jolshan commented on pull request #9626:
URL: https://github.com/apache/kafka/pull/9626#issuecomment-746586891


   @rajinisivaram Yup, will look at the rebase next.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ning2008wisc commented on pull request #9224: KAFKA-10304: refactor MM2 integration tests

2020-12-16 Thread GitBox


ning2008wisc commented on pull request #9224:
URL: https://github.com/apache/kafka/pull/9224#issuecomment-746543315


   @mimaison thanks so much for all your efforts on reviewing. Really 
appreciated if you may have time before end of this year to do 1-2 final 
reviews to merge this PR. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-7641) Add `consumer.group.max.size` to cap consumer metadata size on broker

2020-12-16 Thread Justin Jack (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17250410#comment-17250410
 ] 

Justin Jack commented on KAFKA-7641:


stanislavkozlovski commented on pull request #6163: KAFKA-7641: Introduce 
"consumer.group.max.size" config to limit consumer group sizes
URL: https://github.com/apache/kafka/pull/6163

This patch introduces a new config - "consumer.group.max.size", which caps the 
maximum size any consumer group can reach. It has a default value of 
Int.MAX_VALUE.
Once a consumer group is of the maximum size, subsequent JoinGroup requests 
receive a MAX_SIZE_REACHED error.

In the case where the config is changed and a Coordinator broker with the new 
config loads an old group that is over the threshold, members are kicked out of 
the group and a rebalance is forced.

I have added two integration tests for both scenarios - a member joining an 
already-full group and a rolling restart with a new config

Committer Checklist (excluded from commit message)
[ ] Verify design and implementation
[ ] Verify test coverage and CI build status
[ ] Verify documentation (including upgrade notes)

for more details please visit our [online manga reading 
directory|https://mangapan.com/]

> Add `consumer.group.max.size` to cap consumer metadata size on broker
> -
>
> Key: KAFKA-7641
> URL: https://issues.apache.org/jira/browse/KAFKA-7641
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Boyang Chen
>Assignee: Stanislav Kozlovski
>Priority: Major
>  Labels: kip
> Fix For: 2.2.0
>
>
> In the JIRA discussion https://issues.apache.org/jira/browse/KAFKA-7610, 
> Jason concluded an edge case of current consumer protocol which could cause 
> memory burst on broker side:
> ```the case we observed in practice was caused by a consumer that was slow to 
> rejoin the group after a rebalance had begun. At the same time, there were 
> new members that were trying to join the group for the first time. The 
> request timeout was significantly lower than the rebalance timeout, so the 
> JoinGroup of the new members kept timing out. The timeout caused a retry and 
> the group size eventually become quite large because we could not detect the 
> fact that the new members were no longer there.```
> Since many disorganized join group requests are spamming the group metadata, 
> we should define a cap on broker side to avoid one consumer group from 
> growing too large. So far I feel it's appropriate to introduce this as a 
> server config since most times this value is only dealing with error 
> scenarios, client users shouldn't worry about this config.
> KIP-389: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-389%3A+Introduce+a+configurable+consumer+group+size+limit]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10798) Failed authentication delay doesn't work with some SASL authentication failures

2020-12-16 Thread Rajini Sivaram (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-10798:
---
Fix Version/s: 2.6.2
   2.7.1

> Failed authentication delay doesn't work with some SASL authentication 
> failures
> ---
>
> Key: KAFKA-10798
> URL: https://issues.apache.org/jira/browse/KAFKA-10798
> Project: Kafka
>  Issue Type: Bug
>  Components: security
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 2.8.0, 2.7.1, 2.6.2
>
>
> KIP-306 introduced the config `connection.failed.authentication.delay.ms` to 
> delay connection closing on brokers for failed authentication to limit the 
> rate of retried authentications from clients in order to avoid excessive 
> authentication load on brokers from failed clients. We rely on authentication 
> failure response to be delayed in this case to prevent clients from detecting 
> the failure and retrying sooner.
> SaslServerAuthenticator delays response for SaslAuthenticationException, but 
> not for SaslException, even though SaslException is also converted into 
> SaslAuthenticationException and processed as an authentication failure by 
> both server and clients. As a result, connection delay is not applied in many 
> scenarios like SCRAM authentication failures.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on pull request #9708: KAFKA-9126: KIP-689: StreamJoined changelog configuration

2020-12-16 Thread GitBox


vvcephei commented on pull request #9708:
URL: https://github.com/apache/kafka/pull/9708#issuecomment-746468357


   Test failure was unrelated:
Build / JDK 11 / 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] govi20 commented on pull request #9760: KAFKA-10850: Use 'Int.box' to replace deprecated 'new Integer' from BrokerToControllerRequestThreadTest

2020-12-16 Thread GitBox


govi20 commented on pull request #9760:
URL: https://github.com/apache/kafka/pull/9760#issuecomment-746391613


   Hi @chia7712 , can you please review this PR?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] govi20 opened a new pull request #9760: KAFKA-10850: Use 'Int.box' to replace deprecated 'new Integer' from BrokerToControllerRequestThreadTest

2020-12-16 Thread GitBox


govi20 opened a new pull request #9760:
URL: https://github.com/apache/kafka/pull/9760


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #9748: MINOR: Simplify ApiKeys by relying on ApiMessageType

2020-12-16 Thread GitBox


ijuma merged pull request #9748:
URL: https://github.com/apache/kafka/pull/9748


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9748: MINOR: Simplify ApiKeys by relying on ApiMessageType

2020-12-16 Thread GitBox


ijuma commented on pull request #9748:
URL: https://github.com/apache/kafka/pull/9748#issuecomment-746383611


   JDK8 and 11 passed, 15 had one flaky unrelated failure:
   
   > kafka.server.ClientQuotasRequestTest.testAlterIpQuotasRequest



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 merged pull request #9759: HOTFIX: Access apiversions data via method not field

2020-12-16 Thread GitBox


chia7712 merged pull request #9759:
URL: https://github.com/apache/kafka/pull/9759


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9759: HOTFIX: Access apiversions data via method not field

2020-12-16 Thread GitBox


chia7712 commented on pull request #9759:
URL: https://github.com/apache/kafka/pull/9759#issuecomment-746365374


   ```
   22:18:23  Use '--warning-mode all' to show the individual deprecation 
warnings.
   22:18:23  See 
https://docs.gradle.org/6.7.1/userguide/command_line_interface.html#sec:command_line_warnings
   22:18:23  
   22:18:23  BUILD SUCCESSFUL in 9m 3s
   22:18:23  205 actionable tasks: 171 executed, 34 up-to-date
   22:18:23  
   22:18:23  See the profiling report at: 
file:///home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-9759/build/reports/profile/profile-2020-12-16-14-09-11.html
   22:18:23  A fine-grained performance profile is available: use the --scan 
option.
   ```
   
   build successful :)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #9552: KAFKA-10656: Log the feature flags received by the client

2020-12-16 Thread GitBox


tombentley commented on pull request #9552:
URL: https://github.com/apache/kafka/pull/9552#issuecomment-746347168


   https://github.com/apache/kafka/pull/9759



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #9759: HOTFIX: Access apiversions data via method not field

2020-12-16 Thread GitBox


tombentley commented on pull request #9759:
URL: https://github.com/apache/kafka/pull/9759#issuecomment-746344348


   cc @chia7712 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley opened a new pull request #9759: HOTFIX: Access apiversions data via method not field

2020-12-16 Thread GitBox


tombentley opened a new pull request #9759:
URL: https://github.com/apache/kafka/pull/9759


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #9552: KAFKA-10656: Log the feature flags received by the client

2020-12-16 Thread GitBox


tombentley commented on pull request #9552:
URL: https://github.com/apache/kafka/pull/9552#issuecomment-746338918


   @chia7712 I'll get right on it



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9552: KAFKA-10656: Log the feature flags received by the client

2020-12-16 Thread GitBox


chia7712 commented on pull request #9552:
URL: https://github.com/apache/kafka/pull/9552#issuecomment-746336775


   @tombentley 
https://github.com/apache/kafka/commit/1a10c3445e157da1d2fd670c043f19c385465eb0 
changes the access modifier of "data" so this PR breaks the build. Could you 
file a hot fix for it? 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10417) suppress() with cogroup() throws ClassCastException

2020-12-16 Thread Leah Thomas (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leah Thomas resolved KAFKA-10417.
-
Resolution: Fixed

> suppress() with cogroup() throws ClassCastException
> ---
>
> Key: KAFKA-10417
> URL: https://issues.apache.org/jira/browse/KAFKA-10417
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Wardha Perinkada Kattu
>Assignee: Leah Thomas
>Priority: Critical
>  Labels: kafka-streams
> Fix For: 2.8.0, 2.7.1
>
>
> Streams operation - `cogroup()`, `aggregate()` followed by `suppress()` 
> throws `ClassCastException`
> Works fine without the `suppress()`
> Code block tested -
> {code:java}
> val stream1 = requestStreams.merge(successStreams).merge(errorStreams)
> .groupByKey(Grouped.with(Serdes.String(), 
> serdesConfig.notificationSerde()))
> val streams2 = confirmationStreams
> .groupByKey(Grouped.with(Serdes.String(), 
> serdesConfig.confirmationsSerde()))
> val cogrouped = 
> stream1.cogroup(notificationAggregator).cogroup(streams2, 
> confirmationsAggregator)
> 
> .windowedBy(TimeWindows.of(Duration.ofMinutes(notificationStreamsConfig.joinWindowMinutes.toLong())).grace(Duration.ofMinutes(notificationStreamsConfig.graceDurationMinutes.toLong(
> .aggregate({ null }, Materialized.`as` NotificationMetric, WindowStore ByteArray>>("time-windowed-aggregated-stream-store")
> 
> .withValueSerde(serdesConfig.notificationMetricSerde()))
>  .suppress(Suppressed.untilWindowCloses(unbounded()))
> .toStream()
> {code}
> Exception thrown is:
> {code:java}
> Caused by: java.lang.ClassCastException: class 
> org.apache.kafka.streams.kstream.internals.PassThrough cannot be cast to 
> class org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier 
> (org.apache.kafka.streams.kstream.internals.PassThrough and 
> org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier are in 
> unnamed module of loader 'app')
> {code}
> [https://stackoverflow.com/questions/63459685/kgroupedstream-with-cogroup-aggregate-suppress]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10292) fix flaky streams/streams_broker_bounce_test.py

2020-12-16 Thread Bill Bejeck (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17250330#comment-17250330
 ] 

Bill Bejeck commented on KAFKA-10292:
-

Since this is an outstanding issue, I'm changing the blocker status.  Also, 
I've updated the fix version to 2.8.0.

 

Thanks,

Bill

> fix flaky streams/streams_broker_bounce_test.py
> ---
>
> Key: KAFKA-10292
> URL: https://issues.apache.org/jira/browse/KAFKA-10292
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams, system tests
>Reporter: Chia-Ping Tsai
>Assignee: Bruno Cadonna
>Priority: Major
> Fix For: 2.8.0
>
>
> {quote}
> Module: kafkatest.tests.streams.streams_broker_bounce_test
> Class:  StreamsBrokerBounceTest
> Method: test_broker_type_bounce
> Arguments:
> \{
>   "broker_type": "leader",
>   "failure_mode": "clean_bounce",
>   "num_threads": 1,
>   "sleep_time_secs": 120
> \}
> {quote}
> {quote}
> Module: kafkatest.tests.streams.streams_broker_bounce_test
> Class:  StreamsBrokerBounceTest
> Method: test_broker_type_bounce
> Arguments:
> \{
>   "broker_type": "controller",
>   "failure_mode": "hard_shutdown",
>   "num_threads": 3,
>   "sleep_time_secs": 120
> \}
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10292) fix flaky streams/streams_broker_bounce_test.py

2020-12-16 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10292?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-10292:

Fix Version/s: 2.8.0

> fix flaky streams/streams_broker_bounce_test.py
> ---
>
> Key: KAFKA-10292
> URL: https://issues.apache.org/jira/browse/KAFKA-10292
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams, system tests
>Reporter: Chia-Ping Tsai
>Assignee: Bruno Cadonna
>Priority: Major
> Fix For: 2.8.0
>
>
> {quote}
> Module: kafkatest.tests.streams.streams_broker_bounce_test
> Class:  StreamsBrokerBounceTest
> Method: test_broker_type_bounce
> Arguments:
> \{
>   "broker_type": "leader",
>   "failure_mode": "clean_bounce",
>   "num_threads": 1,
>   "sleep_time_secs": 120
> \}
> {quote}
> {quote}
> Module: kafkatest.tests.streams.streams_broker_bounce_test
> Class:  StreamsBrokerBounceTest
> Method: test_broker_type_bounce
> Arguments:
> \{
>   "broker_type": "controller",
>   "failure_mode": "hard_shutdown",
>   "num_threads": 3,
>   "sleep_time_secs": 120
> \}
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10292) fix flaky streams/streams_broker_bounce_test.py

2020-12-16 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10292?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-10292:

Priority: Major  (was: Blocker)

> fix flaky streams/streams_broker_bounce_test.py
> ---
>
> Key: KAFKA-10292
> URL: https://issues.apache.org/jira/browse/KAFKA-10292
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams, system tests
>Reporter: Chia-Ping Tsai
>Assignee: Bruno Cadonna
>Priority: Major
> Fix For: 2.7.0
>
>
> {quote}
> Module: kafkatest.tests.streams.streams_broker_bounce_test
> Class:  StreamsBrokerBounceTest
> Method: test_broker_type_bounce
> Arguments:
> \{
>   "broker_type": "leader",
>   "failure_mode": "clean_bounce",
>   "num_threads": 1,
>   "sleep_time_secs": 120
> \}
> {quote}
> {quote}
> Module: kafkatest.tests.streams.streams_broker_bounce_test
> Class:  StreamsBrokerBounceTest
> Method: test_broker_type_bounce
> Arguments:
> \{
>   "broker_type": "controller",
>   "failure_mode": "hard_shutdown",
>   "num_threads": 3,
>   "sleep_time_secs": 120
> \}
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10292) fix flaky streams/streams_broker_bounce_test.py

2020-12-16 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10292?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-10292:

Fix Version/s: (was: 2.7.0)

> fix flaky streams/streams_broker_bounce_test.py
> ---
>
> Key: KAFKA-10292
> URL: https://issues.apache.org/jira/browse/KAFKA-10292
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams, system tests
>Reporter: Chia-Ping Tsai
>Assignee: Bruno Cadonna
>Priority: Major
>
> {quote}
> Module: kafkatest.tests.streams.streams_broker_bounce_test
> Class:  StreamsBrokerBounceTest
> Method: test_broker_type_bounce
> Arguments:
> \{
>   "broker_type": "leader",
>   "failure_mode": "clean_bounce",
>   "num_threads": 1,
>   "sleep_time_secs": 120
> \}
> {quote}
> {quote}
> Module: kafkatest.tests.streams.streams_broker_bounce_test
> Class:  StreamsBrokerBounceTest
> Method: test_broker_type_bounce
> Arguments:
> \{
>   "broker_type": "controller",
>   "failure_mode": "hard_shutdown",
>   "num_threads": 3,
>   "sleep_time_secs": 120
> \}
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10656) NetworkClient.java: print out the feature flags received at DEBUG level, as well as the other version information

2020-12-16 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10656?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-10656:
---
Fix Version/s: (was: 2.7.0)
   2.8.0

> NetworkClient.java: print out the feature flags received at DEBUG level, as 
> well as the other version information
> -
>
> Key: KAFKA-10656
> URL: https://issues.apache.org/jira/browse/KAFKA-10656
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin McCabe
>Assignee: Tom Bentley
>Priority: Major
> Fix For: 2.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10656) NetworkClient.java: print out the feature flags received at DEBUG level, as well as the other version information

2020-12-16 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10656?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-10656.

Fix Version/s: 2.7.0
   Resolution: Fixed

> NetworkClient.java: print out the feature flags received at DEBUG level, as 
> well as the other version information
> -
>
> Key: KAFKA-10656
> URL: https://issues.apache.org/jira/browse/KAFKA-10656
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin McCabe
>Assignee: Tom Bentley
>Priority: Major
> Fix For: 2.7.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 merged pull request #9552: KAFKA-10656: Log the feature flags received by the client

2020-12-16 Thread GitBox


chia7712 merged pull request #9552:
URL: https://github.com/apache/kafka/pull/9552


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-16 Thread GitBox


rajinisivaram commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r544223630



##
File path: 
clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
##
@@ -139,4 +152,134 @@
  * @return Iterator for ACL bindings, which may be populated lazily.
  */
 Iterable acls(AclBindingFilter filter);
+
+/**
+ * Check if the caller is authorized to perform the given ACL operation on 
at least one
+ * resource of the given type.
+ *
+ * It is important to override this interface default in implementations 
because
+ * 1. The interface default iterates all AclBindings multiple times, 
without any indexing,
+ *which is a CPU intense work.
+ * 2. The interface default rebuild several sets of strings, which is a 
memory intense work.
+ * 3. The interface default cannot perform the audit logging properly
+ *
+ * @param requestContext Request context including request resourceType, 
security protocol, and listener name
+ * @param op The ACL operation to check
+ * @param resourceType   The resource type to check
+ * @return   Return {@link AuthorizationResult#ALLOWED} if the 
caller is authorized to perform the
+ *   given ACL operation on at least one resource of 
the given type.
+ *   Return {@link AuthorizationResult#DENIED} 
otherwise.
+ */
+default AuthorizationResult 
authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation 
op, ResourceType resourceType) {
+SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType);
+
+if (authorize(requestContext, Collections.singletonList(new Action(
+AclOperation.READ,
+new ResourcePattern(resourceType, "hardcode", 
PatternType.LITERAL),
+0, false, false)))

Review comment:
   Use `logIfAllowed=true` since we are granting access in that case.

##
File path: 
core/src/test/scala/unit/kafka/security/authorizer/AuthorizerWrapperTest.scala
##
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.security.authorizer
+
+import java.net.InetAddress
+import java.util.UUID
+
+import kafka.security.auth.SimpleAclAuthorizer
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import kafka.zk.ZooKeeperTestHarness
+import kafka.zookeeper.ZooKeeperClient
+import org.apache.kafka.common.acl.AclOperation._
+import org.apache.kafka.common.acl._
+import org.apache.kafka.common.network.{ClientInformation, ListenerName}
+import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.requests.{RequestContext, RequestHeader}
+import org.apache.kafka.common.resource.PatternType.LITERAL
+import org.apache.kafka.common.resource.ResourceType._
+import org.apache.kafka.common.resource.{ResourcePattern, ResourceType}
+import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.authorizer._
+import org.junit.Assert._
+import org.junit.{After, Before, Test}
+
+import scala.annotation.nowarn
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
+
+class AuthorizerWrapperTest extends ZooKeeperTestHarness {
+  @nowarn("cat=deprecation")
+  private val wrappedSimpleAuthorizer = new AuthorizerWrapper(new 
SimpleAclAuthorizer)
+  @nowarn("cat=deprecation")
+  private val wrappedSimpleAuthorizerAllowEveryone = new AuthorizerWrapper(new 
SimpleAclAuthorizer)
+  private var resource: ResourcePattern = _
+  private val superUsers = "User:superuser1; User:superuser2"
+  private val username = "alice"
+  private val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
username)
+  private val requestContext = newRequestContext(principal, 
InetAddress.getByName("192.168.0.1"))
+  private var config: KafkaConfig = _
+  private var zooKeeperClient: ZooKeeperClient = _
+
+  private val aclAdded: ArrayBuffer[(Authorizer, Set[AccessControlEntry], 
ResourcePattern)] = ArrayBuffer()
+  private val authorizerTestFactory = new AuthorizerTestFactory(
+  

[GitHub] [kafka] UnityLung commented on pull request #9703: KAFKA-10697 Remove ProduceResponse.responses

2020-12-16 Thread GitBox


UnityLung commented on pull request #9703:
URL: https://github.com/apache/kafka/pull/9703#issuecomment-746161565


   @ijuma  Thanks for your reply.
   I'm going to do it for remove "responses".



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] UnityLung closed pull request #9703: KAFKA-10697 Remove ProduceResponse.responses

2020-12-16 Thread GitBox


UnityLung closed pull request #9703:
URL: https://github.com/apache/kafka/pull/9703


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on a change in pull request #9626: KAFKA-10545: Create topic IDs and propagate to brokers

2020-12-16 Thread GitBox


rajinisivaram commented on a change in pull request #9626:
URL: https://github.com/apache/kafka/pull/9626#discussion_r544204490



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -1434,6 +1447,31 @@ class ReplicaManager(val config: KafkaConfig,
*/
 if (localLog(topicPartition).isEmpty)
   markPartitionOffline(topicPartition)
+else {
+  val id = topicIds.get(topicPartition.topic())
+  // Ensure we have not received a request from an older protocol
+  if (id != null && !id.equals(Uuid.ZERO_UUID)) {
+val log = localLog(topicPartition).get
+// Check if the topic ID is in memory, if not, it must be new 
to the broker.
+// If the broker previously wrote it to file, it would be 
recovered on restart after failure.
+// If the topic ID is not the default (ZERO_UUID), a topic ID 
is being used for the given topic.
+// If the topic ID in the log does not match the one in the 
request, the broker's topic must be stale.
+if (!log.topicId.equals(Uuid.ZERO_UUID) && 
!log.topicId.equals(topicIds.get(topicPartition.topic))) {
+  stateChangeLogger.warn(s"Topic Id in memory: 
${log.topicId.toString} does not" +
+s" match the topic Id provided in the request: " +
+s"${topicIds.get(topicPartition.topic).toString}.")
+} else {
+  // There is not yet a topic ID stored in the log.
+  // Write the partition metadata file if it is empty.
+  if (log.partitionMetadataFile.get.isEmpty()) {
+
log.partitionMetadataFile.get.write(topicIds.get(topicPartition.topic))
+log.topicId = topicIds.get(topicPartition.topic)
+  } else {
+stateChangeLogger.warn("Partition metadata file already 
contains content.")

Review comment:
   Hmm, looking at the conditional statements here, it looks like we would 
write the file the first time we get here because 
`log.partitionMetadataFile.get.isEmpty()` and the second time we would print a 
warning even if the id in the file matches the expected id. Unless I missed 
something.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10859) add @Test annotation to FileStreamSourceTaskTest.testInvalidFile and reduce the loop count to speedup the test

2020-12-16 Thread Tom Bentley (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17250252#comment-17250252
 ] 

Tom Bentley commented on KAFKA-10859:
-

I think we should improve the test slightly anyway, to cover the case where the 
file is created after some number of polls. I'll open a PR after you've merged 
the fix for KAFKA-10846 since there's a dependency on a method added there.

> add @Test annotation to FileStreamSourceTaskTest.testInvalidFile and reduce 
> the loop count to speedup the test
> --
>
> Key: KAFKA-10859
> URL: https://issues.apache.org/jira/browse/KAFKA-10859
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Tom Bentley
>Priority: Major
>  Labels: newbie
>
> FileStreamSourceTaskTest.testInvalidFile miss a `@Test` annotation. Also, it 
> loops 100 times which spend about 2m to complete a unit test.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10859) add @Test annotation to FileStreamSourceTaskTest.testInvalidFile and reduce the loop count to speedup the test

2020-12-16 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17250250#comment-17250250
 ] 

Chia-Ping Tsai commented on KAFKA-10859:


I prefer the small patch:)

> add @Test annotation to FileStreamSourceTaskTest.testInvalidFile and reduce 
> the loop count to speedup the test
> --
>
> Key: KAFKA-10859
> URL: https://issues.apache.org/jira/browse/KAFKA-10859
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Tom Bentley
>Priority: Major
>  Labels: newbie
>
> FileStreamSourceTaskTest.testInvalidFile miss a `@Test` annotation. Also, it 
> loops 100 times which spend about 2m to complete a unit test.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10788) Streamlining Tests in CachingInMemoryKeyValueStoreTest

2020-12-16 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17250249#comment-17250249
 ] 

Sagar Rao commented on KAFKA-10788:
---

Sure [~guozhang].

[~rohitdeshaws] here is the pr comment for your reference: 
[https://github.com/apache/kafka/pull/9508#discussion_r527599966]

The idea is to mock the `underlyingStore` field instead of creating an instance 
of it. MeteredKeyValueStoreTest already does something like this. 

> Streamlining Tests in CachingInMemoryKeyValueStoreTest
> --
>
> Key: KAFKA-10788
> URL: https://issues.apache.org/jira/browse/KAFKA-10788
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Sagar Rao
>Assignee: Rohit Deshpande
>Priority: Major
>  Labels: newbie
>
> While reviewing, kIP-614, it was decided that tests for 
> [CachingInMemoryKeyValueStoreTest.java|https://github.com/apache/kafka/pull/9508/files/899b79781d3412658293b918dce16709121accf1#diff-fdfe70d8fa0798642f0ed54785624aa9850d5d86afff2285acdf12f2775c3588]
>  need to be streamlined to use mocked underlyingStore.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10652) Raft leader should flush accumulated writes after a min size is reached

2020-12-16 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10652?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17250247#comment-17250247
 ] 

Sagar Rao commented on KAFKA-10652:
---

[~hachikuji], no problem. BTW, the queries are old and I have created a PR for 
this change. You can review whenever you get a chance.

> Raft leader should flush accumulated writes after a min size is reached
> ---
>
> Key: KAFKA-10652
> URL: https://issues.apache.org/jira/browse/KAFKA-10652
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jason Gustafson
>Assignee: Sagar Rao
>Priority: Major
>
> In KAFKA-10601, we implemented linger semantics similar to the producer to 
> let the leader accumulate a batch of writes before fsyncing them to disk. 
> Currently the fsync is only based on the linger time, but it would be helpful 
> to make it size-based as well. In other words, if we accumulate a 
> configurable N bytes, then we should not wait for linger expiration and 
> should just fsync immediately.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vamossagar12 commented on a change in pull request #9737: KAFKA-10828: Replacing endorsing with acknowledging for voters

2020-12-16 Thread GitBox


vamossagar12 commented on a change in pull request #9737:
URL: https://github.com/apache/kafka/pull/9737#discussion_r544195331



##
File path: raft/src/main/java/org/apache/kafka/raft/LeaderState.java
##
@@ -50,8 +50,8 @@ protected LeaderState(
 this.highWatermark = Optional.empty();
 
 for (int voterId : voters) {
-boolean hasEndorsedLeader = voterId == localId;
-this.voterReplicaStates.put(voterId, new VoterState(voterId, 
hasEndorsedLeader));
+boolean hasAcknowledgedLeader = voterId == localId;

Review comment:
   done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10759) ARM support for Kafka

2020-12-16 Thread PengLei (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10759?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17250234#comment-17250234
 ] 

PengLei commented on KAFKA-10759:
-

On the arm machine, I completed the compilation of Kafka and run all the test 
cases successfully. For details about the log file, please see the attachment.

Environment of the machine:

OS : Ubuntu-18.04-Arm

JDK: Oracle Jdk1.8 (Arm)

Gradle: Gradle6.7.1

> ARM support for Kafka
> -
>
> Key: KAFKA-10759
> URL: https://issues.apache.org/jira/browse/KAFKA-10759
> Project: Kafka
>  Issue Type: Improvement
>  Components: build
>Reporter: PengLei
>Priority: Major
> Attachments: build_output.log, run_test_output.log
>
>
> ARM support for Kafka.
> I tried to deploy the Kafka cluster on the ARM server, but unfortunately I 
> did not find the official ARM  release for Kafka. I think more and more 
> people will try the same thing as I do.
> Now the CI of kafka (in github) is handled by jenkins-ci. While the test is 
> running under x86 ARCH, the arm ARCH is missing. This leads an problem that 
> we don't have a way to test every pull request that if it'll break the kafka 
> deployment on arm or not. Similarly, we cannot provide the ARM release 
> package without the ARM CI.
> If Apache Kafka community has interested with it, I can help for the 
> integration.
> This is the umbrella issue to track the efforts to make Kafka run on ARM 
> processors.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10859) add @Test annotation to FileStreamSourceTaskTest.testInvalidFile and reduce the loop count to speedup the test

2020-12-16 Thread Tom Bentley (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17250230#comment-17250230
 ] 

Tom Bentley commented on KAFKA-10859:
-

[~chia7712] good spot! I can add this to my PR if you like?

> add @Test annotation to FileStreamSourceTaskTest.testInvalidFile and reduce 
> the loop count to speedup the test
> --
>
> Key: KAFKA-10859
> URL: https://issues.apache.org/jira/browse/KAFKA-10859
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Tom Bentley
>Priority: Major
>  Labels: newbie
>
> FileStreamSourceTaskTest.testInvalidFile miss a `@Test` annotation. Also, it 
> loops 100 times which spend about 2m to complete a unit test.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rajinisivaram commented on a change in pull request #9622: KAFKA-10547; add topicId in MetadataResp

2020-12-16 Thread GitBox


rajinisivaram commented on a change in pull request #9622:
URL: https://github.com/apache/kafka/pull/9622#discussion_r544150030



##
File path: core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
##
@@ -223,6 +224,31 @@ class MetadataRequestTest extends BaseRequestTest {
 assertEquals("V1 Response should have 2 (all) topics", 2, 
metadataResponseV1.topicMetadata.size())
   }
 
+  @Test
+  def testTopicIdsInResponse(): Unit = {
+val replicaAssignment = Map(0 -> Seq(1, 2, 0), 1 -> Seq(2, 0, 1))
+val topic1 = "topic1"
+val topic2 = "topic2"
+createTopic(topic1, replicaAssignment)
+createTopic(topic2, replicaAssignment)
+
+// if version < 9, return ZERO_UUID in MetadataResponse
+val resp1 = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic1, 
topic2).asJava, true, 0, 9).build(), Some(controllerSocketServer))
+assertEquals(2, resp1.topicMetadata.size)
+resp1.topicMetadata.forEach { topicMetadata =>
+  assertEquals(Errors.NONE, topicMetadata.error)
+  assertEquals(Uuid.ZERO_UUID, topicMetadata.topicId())
+}
+
+// from version 10, UUID will be included in MetadataResponse
+val resp2 = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic1, 
topic2).asJava, true, 10, 10).build(), Some(notControllerSocketServer))
+assertEquals(2, resp2.topicMetadata.size)
+resp2.topicMetadata.forEach { topicMetadata =>
+  assertEquals(Errors.NONE, topicMetadata.error)
+  assertNotEquals(Uuid.ZERO_UUID, topicMetadata.topicId())

Review comment:
   we probably also want to assert that the topic id is not null here (even 
though we currently never return null).

##
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##
@@ -314,9 +315,16 @@ class MetadataCache(brokerId: Int) extends Logging {
   error(s"Listeners are not identical across brokers: $aliveNodes")
   }
 
+  val newTopicIds = updateMetadataRequest.topicStates().asScala
+.map(topicState => (topicState.topicName(), topicState.topicId()))
+.filter(_._2 != Uuid.ZERO_UUID).toMap
+  val topicIds = mutable.Map.empty[String, Uuid]
+  topicIds.addAll(metadataSnapshot.topicIds)
+  topicIds.addAll(newTopicIds)

Review comment:
   When a topic is deleted, brokers process UpdateMetadataRequest and 
remove deleted topics from their cache. We track deletion state in ZooKeeper 
and as you mentioned, you can get this information by directly going to ZK in 
kafka-topics.sh. But we don't retain that information in every broker. I would 
remove topic id in the code segment just below this when the topic is removed 
from the MetadataCache since we cannot clearly have a map that keeps growing in 
brokers. Is there a reason why we would want to retain topic id in every broker 
even after the topic has been deleted? We can't get this information through 
existing metadata request from brokers anyway. I guess in future, we can add 
additional metadata to track deleted topic ids if we wanted to, but for now it 
seems better to delete topic ids from MetadataCache when we delete the topic 
from the cache.  What do you think?

##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
##
@@ -320,6 +333,7 @@ public String toString() {
 return "TopicMetadata{" +
 "error=" + error +
 ", topic='" + topic + '\'' +
+", topicId='" + topicId.toString() + '\'' +

Review comment:
   nit: toString() unnecessary





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10859) add @Test annotation to FileStreamSourceTaskTest.testInvalidFile and reduce the loop count to speedup the test

2020-12-16 Thread Tom Bentley (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tom Bentley reassigned KAFKA-10859:
---

Assignee: Tom Bentley

> add @Test annotation to FileStreamSourceTaskTest.testInvalidFile and reduce 
> the loop count to speedup the test
> --
>
> Key: KAFKA-10859
> URL: https://issues.apache.org/jira/browse/KAFKA-10859
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Tom Bentley
>Priority: Major
>  Labels: newbie
>
> FileStreamSourceTaskTest.testInvalidFile miss a `@Test` annotation. Also, it 
> loops 100 times which spend about 2m to complete a unit test.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Reopened] (KAFKA-10292) fix flaky streams/streams_broker_bounce_test.py

2020-12-16 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10292?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reopened KAFKA-10292:


Thanks for the information. Reopen it

> fix flaky streams/streams_broker_bounce_test.py
> ---
>
> Key: KAFKA-10292
> URL: https://issues.apache.org/jira/browse/KAFKA-10292
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams, system tests
>Reporter: Chia-Ping Tsai
>Assignee: Bruno Cadonna
>Priority: Blocker
> Fix For: 2.7.0
>
>
> {quote}
> Module: kafkatest.tests.streams.streams_broker_bounce_test
> Class:  StreamsBrokerBounceTest
> Method: test_broker_type_bounce
> Arguments:
> \{
>   "broker_type": "leader",
>   "failure_mode": "clean_bounce",
>   "num_threads": 1,
>   "sleep_time_secs": 120
> \}
> {quote}
> {quote}
> Module: kafkatest.tests.streams.streams_broker_bounce_test
> Class:  StreamsBrokerBounceTest
> Method: test_broker_type_bounce
> Arguments:
> \{
>   "broker_type": "controller",
>   "failure_mode": "hard_shutdown",
>   "num_threads": 3,
>   "sleep_time_secs": 120
> \}
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10292) fix flaky streams/streams_broker_bounce_test.py

2020-12-16 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17250224#comment-17250224
 ] 

Bruno Cadonna edited comment on KAFKA-10292 at 12/16/20, 10:14 AM:
---

[~chia7712] Sorry for the late reply. The fix solves only one of the issues 
with that test. Actually is solves {{test_all_brokers_bounce}}, but not 
{{test_broker_type_bounce}}. A fix for the latter is in progress here:

https://github.com/apache/kafka/pull/9441 

I think we should reopen this ticket.


was (Author: cadonna):
[~chia7712] Sorry for the late reply. I think the fix solves only one of the 
issues with that test. Actually is solves {{test_all_brokers_bounce}}, but not 
{{test_broker_type_bounce}}. A fix for the latter is in progress here:

https://github.com/apache/kafka/pull/9441 

I think we should reopen this ticket.

> fix flaky streams/streams_broker_bounce_test.py
> ---
>
> Key: KAFKA-10292
> URL: https://issues.apache.org/jira/browse/KAFKA-10292
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams, system tests
>Reporter: Chia-Ping Tsai
>Assignee: Bruno Cadonna
>Priority: Blocker
> Fix For: 2.7.0
>
>
> {quote}
> Module: kafkatest.tests.streams.streams_broker_bounce_test
> Class:  StreamsBrokerBounceTest
> Method: test_broker_type_bounce
> Arguments:
> \{
>   "broker_type": "leader",
>   "failure_mode": "clean_bounce",
>   "num_threads": 1,
>   "sleep_time_secs": 120
> \}
> {quote}
> {quote}
> Module: kafkatest.tests.streams.streams_broker_bounce_test
> Class:  StreamsBrokerBounceTest
> Method: test_broker_type_bounce
> Arguments:
> \{
>   "broker_type": "controller",
>   "failure_mode": "hard_shutdown",
>   "num_threads": 3,
>   "sleep_time_secs": 120
> \}
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10292) fix flaky streams/streams_broker_bounce_test.py

2020-12-16 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17250224#comment-17250224
 ] 

Bruno Cadonna commented on KAFKA-10292:
---

[~chia7712] Sorry for the late reply. I think the fix solves only one of the 
issues with that test. Actually is solves {{test_all_brokers_bounce}}, but not 
{{test_broker_type_bounce}}. A fix for the latter is in progress here:

https://github.com/apache/kafka/pull/9441 

I think we should reopen this ticket.

> fix flaky streams/streams_broker_bounce_test.py
> ---
>
> Key: KAFKA-10292
> URL: https://issues.apache.org/jira/browse/KAFKA-10292
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams, system tests
>Reporter: Chia-Ping Tsai
>Assignee: Bruno Cadonna
>Priority: Blocker
> Fix For: 2.7.0
>
>
> {quote}
> Module: kafkatest.tests.streams.streams_broker_bounce_test
> Class:  StreamsBrokerBounceTest
> Method: test_broker_type_bounce
> Arguments:
> \{
>   "broker_type": "leader",
>   "failure_mode": "clean_bounce",
>   "num_threads": 1,
>   "sleep_time_secs": 120
> \}
> {quote}
> {quote}
> Module: kafkatest.tests.streams.streams_broker_bounce_test
> Class:  StreamsBrokerBounceTest
> Method: test_broker_type_bounce
> Arguments:
> \{
>   "broker_type": "controller",
>   "failure_mode": "hard_shutdown",
>   "num_threads": 3,
>   "sleep_time_secs": 120
> \}
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10859) add @Test annotation to FileStreamSourceTaskTest.testInvalidFile and reduce the loop count to speedup the test

2020-12-16 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-10859:
--

 Summary: add @Test annotation to 
FileStreamSourceTaskTest.testInvalidFile and reduce the loop count to speedup 
the test
 Key: KAFKA-10859
 URL: https://issues.apache.org/jira/browse/KAFKA-10859
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai


FileStreamSourceTaskTest.testInvalidFile miss a `@Test` annotation. Also, it 
loops 100 times which spend about 2m to complete a unit test.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10859) add @Test annotation to FileStreamSourceTaskTest.testInvalidFile and reduce the loop count to speedup the test

2020-12-16 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-10859:
---
Labels: newbie  (was: )

> add @Test annotation to FileStreamSourceTaskTest.testInvalidFile and reduce 
> the loop count to speedup the test
> --
>
> Key: KAFKA-10859
> URL: https://issues.apache.org/jira/browse/KAFKA-10859
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Priority: Major
>  Labels: newbie
>
> FileStreamSourceTaskTest.testInvalidFile miss a `@Test` annotation. Also, it 
> loops 100 times which spend about 2m to complete a unit test.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on pull request #9735: KAFKA-10846: Grow buffer in FileSourceStreamTask only when needed

2020-12-16 Thread GitBox


chia7712 commented on pull request #9735:
URL: https://github.com/apache/kafka/pull/9735#issuecomment-746011953


   >  I think testNormalLifecycleWithResize() covers that when a buffer resize 
is required, and testNormalLifecycle() in the case where a buffer resize is not 
required. Or did I miss a case?
   
   Sorry that I neglect ```testNormalLifecycleWithResize```. It is a little 
hard to review code by iPhone. Will take another look later.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-10858) Convert connect protocol header schemas to use generated protocol

2020-12-16 Thread dengziming (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17250212#comment-17250212
 ] 

dengziming edited comment on KAFKA-10858 at 12/16/20, 9:54 AM:
---

[~chia7712] Thank you, I searched the issues but failed to find the other pr, I 
will spend some time to review [https://github.com/apache/kafka/pull/9641].


was (Author: dengziming):
[~chia7712] Thank you, I searched the issues but failed to find the other pr, I 
will spend some time to review it.

> Convert connect protocol header schemas to use generated protocol
> -
>
> Key: KAFKA-10858
> URL: https://issues.apache.org/jira/browse/KAFKA-10858
> Project: Kafka
>  Issue Type: Improvement
>  Components: protocol
>Reporter: dengziming
>Assignee: dengziming
>Priority: Major
>
> manual managed schema code in and ConnectProtocol
>  IncrementalCooperativeConnectProtocol should be replaced by auto-generated 
> protocol.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10858) Convert connect protocol header schemas to use generated protocol

2020-12-16 Thread dengziming (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17250212#comment-17250212
 ] 

dengziming commented on KAFKA-10858:


[~chia7712] Thank you, I searched the issues but failed to find the other pr, I 
will spend some time to review it.

> Convert connect protocol header schemas to use generated protocol
> -
>
> Key: KAFKA-10858
> URL: https://issues.apache.org/jira/browse/KAFKA-10858
> Project: Kafka
>  Issue Type: Improvement
>  Components: protocol
>Reporter: dengziming
>Assignee: dengziming
>Priority: Major
>
> manual managed schema code in and ConnectProtocol
>  IncrementalCooperativeConnectProtocol should be replaced by auto-generated 
> protocol.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10858) Convert connect protocol header schemas to use generated protocol

2020-12-16 Thread dengziming (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dengziming resolved KAFKA-10858.

Resolution: Duplicate

> Convert connect protocol header schemas to use generated protocol
> -
>
> Key: KAFKA-10858
> URL: https://issues.apache.org/jira/browse/KAFKA-10858
> Project: Kafka
>  Issue Type: Improvement
>  Components: protocol
>Reporter: dengziming
>Assignee: dengziming
>Priority: Major
>
> manual managed schema code in and ConnectProtocol
>  IncrementalCooperativeConnectProtocol should be replaced by auto-generated 
> protocol.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] tombentley commented on pull request #9735: KAFKA-10846: Grow buffer in FileSourceStreamTask only when needed

2020-12-16 Thread GitBox


tombentley commented on pull request #9735:
URL: https://github.com/apache/kafka/pull/9735#issuecomment-746008036


   @chia7712 I think `testNormalLifecycleWithResize()` covers that when a 
buffer resize is required, and `testNormalLifecycle()` in the case where a 
buffer resize is not required. Or did I miss a case?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #9735: KAFKA-10846: Grow buffer in FileSourceStreamTask only when needed

2020-12-16 Thread GitBox


tombentley commented on pull request #9735:
URL: https://github.com/apache/kafka/pull/9735#issuecomment-745997459


   Thanks for taking a look @chia7712. I've added a test, but had to expose the 
buffer size. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10292) fix flaky streams/streams_broker_bounce_test.py

2020-12-16 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10292?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-10292.

Fix Version/s: (was: 2.8.0)
   2.7.0
   Resolution: Fixed

> fix flaky streams/streams_broker_bounce_test.py
> ---
>
> Key: KAFKA-10292
> URL: https://issues.apache.org/jira/browse/KAFKA-10292
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams, system tests
>Reporter: Chia-Ping Tsai
>Assignee: Bruno Cadonna
>Priority: Blocker
> Fix For: 2.7.0
>
>
> {quote}
> Module: kafkatest.tests.streams.streams_broker_bounce_test
> Class:  StreamsBrokerBounceTest
> Method: test_broker_type_bounce
> Arguments:
> \{
>   "broker_type": "leader",
>   "failure_mode": "clean_bounce",
>   "num_threads": 1,
>   "sleep_time_secs": 120
> \}
> {quote}
> {quote}
> Module: kafkatest.tests.streams.streams_broker_bounce_test
> Class:  StreamsBrokerBounceTest
> Method: test_broker_type_bounce
> Arguments:
> \{
>   "broker_type": "controller",
>   "failure_mode": "hard_shutdown",
>   "num_threads": 3,
>   "sleep_time_secs": 120
> \}
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   >